You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Gautam <ga...@gmail.com> on 2019/09/04 14:01:24 UTC

Iceberg using V1 Vectorized Reader over Parquet ..

Hello Devs,
                   As some of you know there's been ongoing work as part of
[1] to build Arrow based vectorization into Iceberg. There's a separate
thread on this dev list where that is being discussed and progress is being
tracked in a separate branch [2]. The overall approach there is to build a
ground up Arrow based implementation of vectorization within Iceberg so
that any compute engine using Iceberg would benefit from those
optimizations. We feel that is indeed the longer term solution and the best
way forward.

Meanwhile, Xabriel & I took to investigating an interim approach where
Iceberg could use the current Vectorization code built into Spark Parquet
reading, which I will refer to as "*V1 Vectorization*". This is the code
path that Spark's DataSourceV1 readers use to read Parquet data. The result
is that we have performance parity between Iceberg and Spark's Vanilla
Parquet reader. We thought we should share this with the larger community
so others can benefit from this gain.

*What we did *:
- Added a new reader viz. *V1VectorizedReader *that internally short
circuits to using the V1 codepath [3]  which does most of the setup and
work to perform vectorization. it's exactly what Vanilla Spark's reader
does underneath the DSV1 implementation.
- It builds an iterator which expects ColumnarBatches from the Objects
returned by the resolving iterator.
- We re-organized and optimized code while building *ReadTask *instances which
considerably improved task initiation and planning time.
- Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
reader in IcebergSource.
- The V1Vectorized reader is an independent class with copied code in some
methods as we didn't want to degrade perf due to inheritance/virtual method
calls (we noticed degradation when we did try to re-use code).
- I'v pushed this code to a separate branch [4] in case others want to give
this a try.


*The Numbers*:


Flat Data 10 files 10M rows each


Benchmark
          Mode  Cnt   Score   Error  Units

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
            ss    5  63.631 ± 1.300   s/op

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
            ss    5  28.322 ± 2.400   s/op

IcebergSourceFlatParquetDataReadBenchmark.readIceberg
            ss    5  65.862 ± 2.480   s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
            ss    5  28.199 ± 1.255   s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
            ss    5  29.822 ± 2.848   s/op

IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
              ss    5  27.953 ± 0.949   s/op




Flat Data Projections 10 files 10M rows each


Benchmark
          Mode  Cnt   Score   Error  Units

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
  ss    5  11.307 ± 1.791   s/op

IcebergSourceFlatParquetDataReadBenchmark.
*readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
            ss    5  11.057 ± 0.236   s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
    ss    5   3.953 ± 1.592   s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
    ss    5   3.619 ± 1.305   s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
    ss    5   4.109 ± 1.734   s/op


Filtered Data 500 files 10k rows each


Benchmark
        Mode  Cnt  Score   Error  Units

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
  ss    5  2.139 ± 0.719   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
      ss    5  2.213 ± 0.598   s/op

IcebergSourceFlatParquetDataFilterBenchmark.
*readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
  ss    5  0.179 ± 0.019   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
    ss    5  0.189 ± 0.046   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
    ss    5  0.195 ± 0.137   s/op


*Perf Notes*:
- Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
flat data scans. Notice how it's almost exactly same as vanilla spark
vectorization.
- Projections work equally well. Although we see Nested column projections
are still not performing as well as we need to be able to push nested
column projections down to Iceberg.
- We saw a slight overhead with Iceberg V1 Vectorization over smaller
workloads, but this goes away with larger data files.

*Why we think this is useful*:
- This approach allows users to benefit from both: 1) Iceberg's metadata
filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
adoption.
- We think this can be an interim solution (until Arrow based impl is fully
performant) for those who are currently blocked by performance difference
between Iceberg and Spark's native Vectorization for interactive usecases.
There's a lot of optimization work and testing gone into V1 vectorization
that Iceberg can now benefit from.
- In many cases companies have proprietary implementations of
*ParquetFileFormat* that could have extended features like complex type
support etc. Our code can use that at runtime as long as '
*buildReaderWithPartitionValues()*'  signature is consistent.. if not the
reader can be easily modified to plug their own vectorized reader in.
- While profiling the Arrow implementation I found it difficult to compare
bottlenecks due to major differences between DSv1 and DSv2 client-to-source
interface paths. This makes it easier to compare numbers and profile code
between V1 vectorization and Arrow vectorization as we now have both paths
working behind a single DataSourceV2 path (viz. IcebergSource).

*Limitations*:
- This implementation is specific to Spark so other compute frameworks like
Presto won't benefit from this.
- It doesn't use Iceberg's Value Reader interface as it bypasses everything
under the Task Data Reading. (added a separate *V1VectorizedTaskDataReader*)
- Need to maintain two readers, as adding any code to Reader.java might
need changes to V1Vectorized Reader. Although, we could minimize this with
a *ReaderUtils* class.


I have the code checked in at
https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
If folks think this is useful and we can keep this as an interim solution
behind a feature flag, I can get a PR up with proper unit tests.

thanks and regards,
-Gautam.


[1] - https://github.com/apache/incubator-iceberg/issues/9
[2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
[3] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
[4] -
https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader

Re: Iceberg using V1 Vectorized Reader over Parquet ..

Posted by Samarth Jain <sa...@gmail.com>.
I recently pushed support for vectorized reads for dictionary encoded
parquet data and wanted to share some benchmark results for string and
numeric data types:

Dictionary Encoded VARCHAR column

Benchmark

Cnt

Score

Error

Units

VectorizedDictionaryEncodedStringsBenchmark.readFileSourceNonVectorized

5

19.974

±1.289

s/op

VectorizedDictionaryEncodedStringsBenchmark.readFileSourceVectorized

5

13.960

±1.327

s/op

VectorizedDictionaryEncodedStringsBenchmark.readIcebergVectorized5k

5

9.081

±0.263

s/op

Non Dictionary Encoded VARCHAR column

Benchmark

Cnt

Score

Error

Units

VectorizedDictionaryEncodedStringsBenchmark.readFileSourceNonVectorized

5

31.044

±1.289

s/op

VectorizedDictionaryEncodedStringsBenchmark.readFileSourceVectorized

5

14.149

±1.327

s/op

VectorizedDictionaryEncodedStringsBenchmark.readIcebergVectorized5k

5

14.480

±0.263

s/op

Dictionary Encoded with Fallback to Plain Encoding VARCHAR column

Benchmark

Cnt

Score

Error

Units

VectorizedFallbackToPlainEncodingStringsBenchmark.readFileSourceNonVectorized

5

20.432

±1.289

s/op

VectorizedFallbackToPlainEncodingStringsBenchmark.readFileSourceVectorized

5

10.913

±1.327

s/op

VectorizedFallbackToPlainEncodingStringsBenchmark.readIcebergVectorized5k

5

7.868

±0.263

s/op


BIGINT column

Benchmark

Cnt

Score

Error

Units

VectorizedDictionaryEncodedLongsBenchmark.readFileSourceNonVectorized

5

20.042

± 1.629

s/op

VectorizedDictionaryEncodedLongsBenchmark.readFileSourceVectorized

5

6.511

± 0.241

s/op

VectorizedDictionaryEncodedLongsBenchmark.readIcebergVectorized5k

5

7.010

± 0.332

s/op


To sum it up, Iceberg Vectorized reads for dictionary encoded string
columns including fallback to plain encoding is around 30% faster than
vectorized spark reads. For dictionary encoded numeric data types like
BIGINT, we are currently 7% slower.


On Mon, Sep 9, 2019 at 4:55 PM Samarth Jain <sa...@gmail.com> wrote:

> I wanted to share progress made so far with improving the performance of
> the Iceberg Arrow vectorized read path.
>
> BIGINT column
>
> Benchmark
>
> Cnt
>
> Score
>
> Error
>
> Units
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>
> 5
>
> 4.642
>
> ± 1.629
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k
>
> 5
>
> 4.311
>
> ± 0.241
>
> s/op
>
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
>
> 5
>
> 4.348
>
> ± 0.332
>
> s/op
>
>
> DECIMAL column
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>
> 5
>
> 22.103
>
> ±1.928
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k
>
> 5
>
> 21.385
>
> ±0.347
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>
> 5
>
> 21.815
>
> ±0.206
>
> s/op
>
> BIGINT, INT, FLOAT, DOUBLE, DECIMAL, DATE, TIMESTAMP columns
>
> Benchmark
>
> Cnt
>
> Score
>
> Error
>
> Units
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>
> 5
>
> 36.83
>
> ±0.76
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k
>
> 5
>
> 40.242
>
> ±0.643
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>
> 5
>
> 37.222
>
> ±0.368
>
> s/op
>
>
> VARCHAR column
>
> Benchmark
>
> Cnt
>
> Score
>
> Error
>
> Units
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>
> 5
>
> 10.946
>
> ±1.32
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k
>
> 5
>
> 13.133
>
> ±1.327
>
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>
> 5
>
> 13.923
>
> ±0.263
>
> s/op
>
> In a nutshell, the read performance for *BIGINT, FLOAT, DOUBLE, DECIMAL,
> DATE and TIMESTAMP* types is same as Spark. This holds both for
> benchmarks using an Iceberg table with single column as well as a table
> schema with multiple such columns.
> For String type data, *Iceberg Arrow path is ~20% slower than Spark*. The
> current implementation is hot-spotting in copying memory from a heap byte
> array to the arrow value buffer.
>
> This benchmark was run by building Spark and Iceberg using 0.14.1 version
> of Arrow. Below configs were used to improve Arrow read performance:
>
> Allow unsafe memory access in arrow (arrow.enable_unsafe_memory_access
> set to true)
>
> Checking for null when doing a get(index) disabled in arrow (arrow.enable_null_check_for_get
> set to false)
>
>
> I also wanted to point out that I have other optimizations in progress
> including pre-fetching of Parquet data pages so that the hot code path
> doesn't incur the cost of decompressing the pages. This requires changes in
> the Parquet library for which I have a couple of PRs open. The PoC I built
> for pre-fetching for BIGINT columns saw the read performance of Iceberg
> Arrow path to be 40% faster than Spark. Also, we have identified places
> where we can run a tight loop to improve vectorized performance. With these
> changes, I expect Iceberg vectorized read performance to be faster than
> Spark's.
>
>
>
>
> On Thu, Sep 5, 2019 at 4:40 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Nice work, Gautam! Looks like this could be a useful patch before the
>> Arrow read path is ready to go.
>>
>> It's also good to see the performance between Spark's DataSource v2 and
>> v1. We were wondering if the additional projection added in the v2 path was
>> causing v2 to be slower than v1 due to an extra copy, but from your tests
>> it looks like that isn't a problem. My guess is that either v2 is somehow
>> avoiding other work and is faster (unlikely) or that an equivalent
>> projection is getting added in the v1 path automatically by the codegen
>> support for columnar reads. Either way, we know that v2 isn't slower
>> because of that overhead.
>>
>> I have some concerns about merging it right now. Mainly, I'd like to get
>> a release out soon so that we have our first Apache release. Including a
>> vectorized path in that release would delay it, so I'd like to keep
>> vectorization separate for now and follow up with a release that includes
>> vectorization when that code is stable. Does that plan work for you guys?
>>
>> My other concern about the PR is the reason why I think merging it would
>> delay the release. Originally, we used Spark's built-in read support for
>> Parquet that creates InternalRow. But we found that version differences
>> between Parquet pulled in by Spark and Iceberg caused runtime errors. We
>> fixed those problems by removing the use of Spark internal classes and
>> shading/relocating Parquet to be able to use a our own copy of Parquet.
>> Merging this support would require reverting that change and updating the
>> iceberg-spark-runtime Jar build.
>>
>> It also looks like we will need to invest some time in making sure this
>> read path provides the same guarantees as other readers. From looking at
>> this, I think that this passes a Spark schema to project columns, but that
>> would result in by-name resolution instead of using column IDs. So we will
>> need to fix that up for each file to ensure the right columns are projected
>> after schema changes, like renaming a column.
>>
>> I'm at ApacheCon next week, but I'll take a closer look at this when I am
>> back.
>>
>> rb
>>
>>
>> On Thu, Sep 5, 2019 at 4:59 AM Gautam <ga...@gmail.com> wrote:
>>
>>> I'v added unit tests and created a PR for the v1 vectorization work :
>>> https://github.com/apache/incubator-iceberg/pull/452
>>>
>>> I'm sure there's scope for further improvement so lemme know your
>>> feedback over the PR so I can sharpen it further.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>> On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee <
>>> moulimukherjee@gmail.com> wrote:
>>>
>>>> Hi Gautam, this is very exciting to see. It would be great if this was
>>>> available behind a flag if possible.
>>>>
>>>> Best,
>>>> Mouli
>>>>
>>>> On Wed, Sep 4, 2019, 7:01 AM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> Hello Devs,
>>>>>                    As some of you know there's been ongoing work as
>>>>> part of [1] to build Arrow based vectorization into Iceberg. There's a
>>>>> separate thread on this dev list where that is being discussed and progress
>>>>> is being tracked in a separate branch [2]. The overall approach there is to
>>>>> build a ground up Arrow based implementation of vectorization within
>>>>> Iceberg so that any compute engine using Iceberg would benefit from those
>>>>> optimizations. We feel that is indeed the longer term solution and the best
>>>>> way forward.
>>>>>
>>>>> Meanwhile, Xabriel & I took to investigating an interim approach where
>>>>> Iceberg could use the current Vectorization code built into Spark Parquet
>>>>> reading, which I will refer to as "*V1 Vectorization*". This is the
>>>>> code path that Spark's DataSourceV1 readers use to read Parquet data. The
>>>>> result is that we have performance parity between Iceberg and Spark's
>>>>> Vanilla Parquet reader. We thought we should share this with the larger
>>>>> community so others can benefit from this gain.
>>>>>
>>>>> *What we did *:
>>>>> - Added a new reader viz. *V1VectorizedReader *that internally short
>>>>> circuits to using the V1 codepath [3]  which does most of the setup and
>>>>> work to perform vectorization. it's exactly what Vanilla Spark's reader
>>>>> does underneath the DSV1 implementation.
>>>>> - It builds an iterator which expects ColumnarBatches from the Objects
>>>>> returned by the resolving iterator.
>>>>> - We re-organized and optimized code while building *ReadTask *instances which
>>>>> considerably improved task initiation and planning time.
>>>>> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables
>>>>> this reader in IcebergSource.
>>>>> - The V1Vectorized reader is an independent class with copied code in
>>>>> some methods as we didn't want to degrade perf due to inheritance/virtual
>>>>> method calls (we noticed degradation when we did try to re-use code).
>>>>> - I'v pushed this code to a separate branch [4] in case others want to
>>>>> give this a try.
>>>>>
>>>>>
>>>>> *The Numbers*:
>>>>>
>>>>>
>>>>> Flat Data 10 files 10M rows each
>>>>>
>>>>>
>>>>> Benchmark
>>>>>                 Mode  Cnt   Score   Error  Units
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>>>>                 ss    5  63.631 ± 1.300   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>>>>                   ss    5  28.322 ± 2.400   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>>>>                   ss    5  65.862 ± 2.480   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>>>>>                 ss    5  28.199 ± 1.255   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>>>>>                 ss    5  29.822 ± 2.848   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>>>>>                   ss    5  27.953 ± 0.949   s/op
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Flat Data Projections 10 files 10M rows each
>>>>>
>>>>>
>>>>> Benchmark
>>>>>                 Mode  Cnt   Score   Error  Units
>>>>>
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>>>>   ss    5  11.307 ± 1.791   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.
>>>>> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ±
>>>>> 0.087   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>>>>                   ss    5  11.057 ± 0.236   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>>>>>     ss    5   3.953 ± 1.592   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>>>>>     ss    5   3.619 ± 1.305   s/op
>>>>>
>>>>>
>>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>>>>>     ss    5   4.109 ± 1.734   s/op
>>>>>
>>>>>
>>>>> Filtered Data 500 files 10k rows each
>>>>>
>>>>>
>>>>> Benchmark
>>>>>               Mode  Cnt  Score   Error  Units
>>>>>
>>>>>
>>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>>>>>   ss    5  2.139 ± 0.719   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>>>>>       ss    5  2.213 ± 0.598   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataFilterBenchmark.
>>>>> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029
>>>>> s/op
>>>>>
>>>>>
>>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>>>>>   ss    5  0.179 ± 0.019   s/op
>>>>>
>>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>>>>>     ss    5  0.189 ± 0.046   s/op
>>>>>
>>>>>
>>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>>>>>     ss    5  0.195 ± 0.137   s/op
>>>>>
>>>>>
>>>>> *Perf Notes*:
>>>>> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is
>>>>> in flat data scans. Notice how it's almost exactly same as vanilla spark
>>>>> vectorization.
>>>>> - Projections work equally well. Although we see Nested column
>>>>> projections are still not performing as well as we need to be able to push
>>>>> nested column projections down to Iceberg.
>>>>> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
>>>>> workloads, but this goes away with larger data files.
>>>>>
>>>>> *Why we think this is useful*:
>>>>> - This approach allows users to benefit from both: 1) Iceberg's
>>>>> metadata filtering and 2) Spark's Scan Vectorization. This should help with
>>>>> Iceberg adoption.
>>>>> - We think this can be an interim solution (until Arrow based impl is
>>>>> fully performant) for those who are currently blocked by performance
>>>>> difference between Iceberg and Spark's native Vectorization for interactive
>>>>> usecases. There's a lot of optimization work and testing gone into V1
>>>>> vectorization that Iceberg can now benefit from.
>>>>> - In many cases companies have proprietary implementations of
>>>>> *ParquetFileFormat* that could have extended features like complex
>>>>> type support etc. Our code can use that at runtime as long as '
>>>>> *buildReaderWithPartitionValues()*'  signature is consistent.. if not
>>>>> the reader can be easily modified to plug their own vectorized reader in.
>>>>> - While profiling the Arrow implementation I found it difficult to
>>>>> compare bottlenecks due to major differences between DSv1 and DSv2
>>>>> client-to-source interface paths. This makes it easier to compare numbers
>>>>> and profile code between V1 vectorization and Arrow vectorization as we now
>>>>> have both paths working behind a single DataSourceV2 path (viz.
>>>>> IcebergSource).
>>>>>
>>>>> *Limitations*:
>>>>> - This implementation is specific to Spark so other compute frameworks
>>>>> like Presto won't benefit from this.
>>>>> - It doesn't use Iceberg's Value Reader interface as it bypasses
>>>>> everything under the Task Data Reading. (added a separate
>>>>> *V1VectorizedTaskDataReader*)
>>>>> - Need to maintain two readers, as adding any code to Reader.java
>>>>> might need changes to V1Vectorized Reader. Although, we could minimize this
>>>>> with a *ReaderUtils* class.
>>>>>
>>>>>
>>>>> I have the code checked in at
>>>>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
>>>>> If folks think this is useful and we can keep this as an interim solution
>>>>> behind a feature flag, I can get a PR up with proper unit tests.
>>>>>
>>>>> thanks and regards,
>>>>> -Gautam.
>>>>>
>>>>>
>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/9
>>>>> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
>>>>> [3] -
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
>>>>> [4] -
>>>>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>>>>>
>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Iceberg using V1 Vectorized Reader over Parquet ..

Posted by Samarth Jain <sa...@gmail.com>.
I wanted to share progress made so far with improving the performance of
the Iceberg Arrow vectorized read path.

BIGINT column

Benchmark

Cnt

Score

Error

Units

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized

5

4.642

± 1.629

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k

5

4.311

± 0.241

s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k

5

4.348

± 0.332

s/op


DECIMAL column

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized

5

22.103

±1.928

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k

5

21.385

±0.347

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k

5

21.815

±0.206

s/op

BIGINT, INT, FLOAT, DOUBLE, DECIMAL, DATE, TIMESTAMP columns

Benchmark

Cnt

Score

Error

Units

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized

5

36.83

±0.76

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k

5

40.242

±0.643

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k

5

37.222

±0.368

s/op


VARCHAR column

Benchmark

Cnt

Score

Error

Units

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized

5

10.946

±1.32

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1k

5

13.133

±1.327

s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k

5

13.923

±0.263

s/op

In a nutshell, the read performance for *BIGINT, FLOAT, DOUBLE, DECIMAL,
DATE and TIMESTAMP* types is same as Spark. This holds both for benchmarks
using an Iceberg table with single column as well as a table schema with
multiple such columns.
For String type data, *Iceberg Arrow path is ~20% slower than Spark*. The
current implementation is hot-spotting in copying memory from a heap byte
array to the arrow value buffer.

This benchmark was run by building Spark and Iceberg using 0.14.1 version
of Arrow. Below configs were used to improve Arrow read performance:

Allow unsafe memory access in arrow (arrow.enable_unsafe_memory_access set
to true)

Checking for null when doing a get(index) disabled in arrow
(arrow.enable_null_check_for_get
set to false)


I also wanted to point out that I have other optimizations in progress
including pre-fetching of Parquet data pages so that the hot code path
doesn't incur the cost of decompressing the pages. This requires changes in
the Parquet library for which I have a couple of PRs open. The PoC I built
for pre-fetching for BIGINT columns saw the read performance of Iceberg
Arrow path to be 40% faster than Spark. Also, we have identified places
where we can run a tight loop to improve vectorized performance. With these
changes, I expect Iceberg vectorized read performance to be faster than
Spark's.




On Thu, Sep 5, 2019 at 4:40 PM Ryan Blue <rb...@netflix.com.invalid> wrote:

> Nice work, Gautam! Looks like this could be a useful patch before the
> Arrow read path is ready to go.
>
> It's also good to see the performance between Spark's DataSource v2 and
> v1. We were wondering if the additional projection added in the v2 path was
> causing v2 to be slower than v1 due to an extra copy, but from your tests
> it looks like that isn't a problem. My guess is that either v2 is somehow
> avoiding other work and is faster (unlikely) or that an equivalent
> projection is getting added in the v1 path automatically by the codegen
> support for columnar reads. Either way, we know that v2 isn't slower
> because of that overhead.
>
> I have some concerns about merging it right now. Mainly, I'd like to get a
> release out soon so that we have our first Apache release. Including a
> vectorized path in that release would delay it, so I'd like to keep
> vectorization separate for now and follow up with a release that includes
> vectorization when that code is stable. Does that plan work for you guys?
>
> My other concern about the PR is the reason why I think merging it would
> delay the release. Originally, we used Spark's built-in read support for
> Parquet that creates InternalRow. But we found that version differences
> between Parquet pulled in by Spark and Iceberg caused runtime errors. We
> fixed those problems by removing the use of Spark internal classes and
> shading/relocating Parquet to be able to use a our own copy of Parquet.
> Merging this support would require reverting that change and updating the
> iceberg-spark-runtime Jar build.
>
> It also looks like we will need to invest some time in making sure this
> read path provides the same guarantees as other readers. From looking at
> this, I think that this passes a Spark schema to project columns, but that
> would result in by-name resolution instead of using column IDs. So we will
> need to fix that up for each file to ensure the right columns are projected
> after schema changes, like renaming a column.
>
> I'm at ApacheCon next week, but I'll take a closer look at this when I am
> back.
>
> rb
>
>
> On Thu, Sep 5, 2019 at 4:59 AM Gautam <ga...@gmail.com> wrote:
>
>> I'v added unit tests and created a PR for the v1 vectorization work :
>> https://github.com/apache/incubator-iceberg/pull/452
>>
>> I'm sure there's scope for further improvement so lemme know your
>> feedback over the PR so I can sharpen it further.
>>
>> Cheers,
>> -Gautam.
>>
>>
>> On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee <mo...@gmail.com>
>> wrote:
>>
>>> Hi Gautam, this is very exciting to see. It would be great if this was
>>> available behind a flag if possible.
>>>
>>> Best,
>>> Mouli
>>>
>>> On Wed, Sep 4, 2019, 7:01 AM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Hello Devs,
>>>>                    As some of you know there's been ongoing work as
>>>> part of [1] to build Arrow based vectorization into Iceberg. There's a
>>>> separate thread on this dev list where that is being discussed and progress
>>>> is being tracked in a separate branch [2]. The overall approach there is to
>>>> build a ground up Arrow based implementation of vectorization within
>>>> Iceberg so that any compute engine using Iceberg would benefit from those
>>>> optimizations. We feel that is indeed the longer term solution and the best
>>>> way forward.
>>>>
>>>> Meanwhile, Xabriel & I took to investigating an interim approach where
>>>> Iceberg could use the current Vectorization code built into Spark Parquet
>>>> reading, which I will refer to as "*V1 Vectorization*". This is the
>>>> code path that Spark's DataSourceV1 readers use to read Parquet data. The
>>>> result is that we have performance parity between Iceberg and Spark's
>>>> Vanilla Parquet reader. We thought we should share this with the larger
>>>> community so others can benefit from this gain.
>>>>
>>>> *What we did *:
>>>> - Added a new reader viz. *V1VectorizedReader *that internally short
>>>> circuits to using the V1 codepath [3]  which does most of the setup and
>>>> work to perform vectorization. it's exactly what Vanilla Spark's reader
>>>> does underneath the DSV1 implementation.
>>>> - It builds an iterator which expects ColumnarBatches from the Objects
>>>> returned by the resolving iterator.
>>>> - We re-organized and optimized code while building *ReadTask *instances which
>>>> considerably improved task initiation and planning time.
>>>> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables
>>>> this reader in IcebergSource.
>>>> - The V1Vectorized reader is an independent class with copied code in
>>>> some methods as we didn't want to degrade perf due to inheritance/virtual
>>>> method calls (we noticed degradation when we did try to re-use code).
>>>> - I'v pushed this code to a separate branch [4] in case others want to
>>>> give this a try.
>>>>
>>>>
>>>> *The Numbers*:
>>>>
>>>>
>>>> Flat Data 10 files 10M rows each
>>>>
>>>>
>>>> Benchmark
>>>>               Mode  Cnt   Score   Error  Units
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>>>                 ss    5  63.631 ± 1.300   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>>>                 ss    5  28.322 ± 2.400   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>>>                 ss    5  65.862 ± 2.480   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>>>>                 ss    5  28.199 ± 1.255   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>>>>                 ss    5  29.822 ± 2.848   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>>>>                   ss    5  27.953 ± 0.949   s/op
>>>>
>>>>
>>>>
>>>>
>>>> Flat Data Projections 10 files 10M rows each
>>>>
>>>>
>>>> Benchmark
>>>>               Mode  Cnt   Score   Error  Units
>>>>
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>>>   ss    5  11.307 ± 1.791   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.
>>>> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
>>>> s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>>>                 ss    5  11.057 ± 0.236   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>>>>     ss    5   3.953 ± 1.592   s/op
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>>>>     ss    5   3.619 ± 1.305   s/op
>>>>
>>>>
>>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>>>>     ss    5   4.109 ± 1.734   s/op
>>>>
>>>>
>>>> Filtered Data 500 files 10k rows each
>>>>
>>>>
>>>> Benchmark
>>>>             Mode  Cnt  Score   Error  Units
>>>>
>>>>
>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>>>>   ss    5  2.139 ± 0.719   s/op
>>>>
>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>>>>       ss    5  2.213 ± 0.598   s/op
>>>>
>>>> IcebergSourceFlatParquetDataFilterBenchmark.
>>>> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029
>>>> s/op
>>>>
>>>>
>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>>>>   ss    5  0.179 ± 0.019   s/op
>>>>
>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>>>>     ss    5  0.189 ± 0.046   s/op
>>>>
>>>>
>>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>>>>     ss    5  0.195 ± 0.137   s/op
>>>>
>>>>
>>>> *Perf Notes*:
>>>> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is
>>>> in flat data scans. Notice how it's almost exactly same as vanilla spark
>>>> vectorization.
>>>> - Projections work equally well. Although we see Nested column
>>>> projections are still not performing as well as we need to be able to push
>>>> nested column projections down to Iceberg.
>>>> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
>>>> workloads, but this goes away with larger data files.
>>>>
>>>> *Why we think this is useful*:
>>>> - This approach allows users to benefit from both: 1) Iceberg's
>>>> metadata filtering and 2) Spark's Scan Vectorization. This should help with
>>>> Iceberg adoption.
>>>> - We think this can be an interim solution (until Arrow based impl is
>>>> fully performant) for those who are currently blocked by performance
>>>> difference between Iceberg and Spark's native Vectorization for interactive
>>>> usecases. There's a lot of optimization work and testing gone into V1
>>>> vectorization that Iceberg can now benefit from.
>>>> - In many cases companies have proprietary implementations of
>>>> *ParquetFileFormat* that could have extended features like complex
>>>> type support etc. Our code can use that at runtime as long as '
>>>> *buildReaderWithPartitionValues()*'  signature is consistent.. if not
>>>> the reader can be easily modified to plug their own vectorized reader in.
>>>> - While profiling the Arrow implementation I found it difficult to
>>>> compare bottlenecks due to major differences between DSv1 and DSv2
>>>> client-to-source interface paths. This makes it easier to compare numbers
>>>> and profile code between V1 vectorization and Arrow vectorization as we now
>>>> have both paths working behind a single DataSourceV2 path (viz.
>>>> IcebergSource).
>>>>
>>>> *Limitations*:
>>>> - This implementation is specific to Spark so other compute frameworks
>>>> like Presto won't benefit from this.
>>>> - It doesn't use Iceberg's Value Reader interface as it bypasses
>>>> everything under the Task Data Reading. (added a separate
>>>> *V1VectorizedTaskDataReader*)
>>>> - Need to maintain two readers, as adding any code to Reader.java might
>>>> need changes to V1Vectorized Reader. Although, we could minimize this with
>>>> a *ReaderUtils* class.
>>>>
>>>>
>>>> I have the code checked in at
>>>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
>>>> If folks think this is useful and we can keep this as an interim solution
>>>> behind a feature flag, I can get a PR up with proper unit tests.
>>>>
>>>> thanks and regards,
>>>> -Gautam.
>>>>
>>>>
>>>> [1] - https://github.com/apache/incubator-iceberg/issues/9
>>>> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
>>>> [3] -
>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
>>>> [4] -
>>>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>>>>
>>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Iceberg using V1 Vectorized Reader over Parquet ..

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Nice work, Gautam! Looks like this could be a useful patch before the Arrow
read path is ready to go.

It's also good to see the performance between Spark's DataSource v2 and v1.
We were wondering if the additional projection added in the v2 path was
causing v2 to be slower than v1 due to an extra copy, but from your tests
it looks like that isn't a problem. My guess is that either v2 is somehow
avoiding other work and is faster (unlikely) or that an equivalent
projection is getting added in the v1 path automatically by the codegen
support for columnar reads. Either way, we know that v2 isn't slower
because of that overhead.

I have some concerns about merging it right now. Mainly, I'd like to get a
release out soon so that we have our first Apache release. Including a
vectorized path in that release would delay it, so I'd like to keep
vectorization separate for now and follow up with a release that includes
vectorization when that code is stable. Does that plan work for you guys?

My other concern about the PR is the reason why I think merging it would
delay the release. Originally, we used Spark's built-in read support for
Parquet that creates InternalRow. But we found that version differences
between Parquet pulled in by Spark and Iceberg caused runtime errors. We
fixed those problems by removing the use of Spark internal classes and
shading/relocating Parquet to be able to use a our own copy of Parquet.
Merging this support would require reverting that change and updating the
iceberg-spark-runtime Jar build.

It also looks like we will need to invest some time in making sure this
read path provides the same guarantees as other readers. From looking at
this, I think that this passes a Spark schema to project columns, but that
would result in by-name resolution instead of using column IDs. So we will
need to fix that up for each file to ensure the right columns are projected
after schema changes, like renaming a column.

I'm at ApacheCon next week, but I'll take a closer look at this when I am
back.

rb


On Thu, Sep 5, 2019 at 4:59 AM Gautam <ga...@gmail.com> wrote:

> I'v added unit tests and created a PR for the v1 vectorization work :
> https://github.com/apache/incubator-iceberg/pull/452
>
> I'm sure there's scope for further improvement so lemme know your feedback
> over the PR so I can sharpen it further.
>
> Cheers,
> -Gautam.
>
>
> On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee <mo...@gmail.com>
> wrote:
>
>> Hi Gautam, this is very exciting to see. It would be great if this was
>> available behind a flag if possible.
>>
>> Best,
>> Mouli
>>
>> On Wed, Sep 4, 2019, 7:01 AM Gautam <ga...@gmail.com> wrote:
>>
>>> Hello Devs,
>>>                    As some of you know there's been ongoing work as part
>>> of [1] to build Arrow based vectorization into Iceberg. There's a separate
>>> thread on this dev list where that is being discussed and progress is being
>>> tracked in a separate branch [2]. The overall approach there is to build a
>>> ground up Arrow based implementation of vectorization within Iceberg so
>>> that any compute engine using Iceberg would benefit from those
>>> optimizations. We feel that is indeed the longer term solution and the best
>>> way forward.
>>>
>>> Meanwhile, Xabriel & I took to investigating an interim approach where
>>> Iceberg could use the current Vectorization code built into Spark Parquet
>>> reading, which I will refer to as "*V1 Vectorization*". This is the
>>> code path that Spark's DataSourceV1 readers use to read Parquet data. The
>>> result is that we have performance parity between Iceberg and Spark's
>>> Vanilla Parquet reader. We thought we should share this with the larger
>>> community so others can benefit from this gain.
>>>
>>> *What we did *:
>>> - Added a new reader viz. *V1VectorizedReader *that internally short
>>> circuits to using the V1 codepath [3]  which does most of the setup and
>>> work to perform vectorization. it's exactly what Vanilla Spark's reader
>>> does underneath the DSV1 implementation.
>>> - It builds an iterator which expects ColumnarBatches from the Objects
>>> returned by the resolving iterator.
>>> - We re-organized and optimized code while building *ReadTask *instances which
>>> considerably improved task initiation and planning time.
>>> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables
>>> this reader in IcebergSource.
>>> - The V1Vectorized reader is an independent class with copied code in
>>> some methods as we didn't want to degrade perf due to inheritance/virtual
>>> method calls (we noticed degradation when we did try to re-use code).
>>> - I'v pushed this code to a separate branch [4] in case others want to
>>> give this a try.
>>>
>>>
>>> *The Numbers*:
>>>
>>>
>>> Flat Data 10 files 10M rows each
>>>
>>>
>>> Benchmark
>>>               Mode  Cnt   Score   Error  Units
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>>                 ss    5  63.631 ± 1.300   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>>                 ss    5  28.322 ± 2.400   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>>                 ss    5  65.862 ± 2.480   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>>>                 ss    5  28.199 ± 1.255   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>>>                 ss    5  29.822 ± 2.848   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>>>                   ss    5  27.953 ± 0.949   s/op
>>>
>>>
>>>
>>>
>>> Flat Data Projections 10 files 10M rows each
>>>
>>>
>>> Benchmark
>>>               Mode  Cnt   Score   Error  Units
>>>
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>>   ss    5  11.307 ± 1.791   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.
>>> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
>>> s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>>                 ss    5  11.057 ± 0.236   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>>>     ss    5   3.953 ± 1.592   s/op
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>>>     ss    5   3.619 ± 1.305   s/op
>>>
>>>
>>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>>>     ss    5   4.109 ± 1.734   s/op
>>>
>>>
>>> Filtered Data 500 files 10k rows each
>>>
>>>
>>> Benchmark
>>>             Mode  Cnt  Score   Error  Units
>>>
>>>
>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>>>   ss    5  2.139 ± 0.719   s/op
>>>
>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>>>       ss    5  2.213 ± 0.598   s/op
>>>
>>> IcebergSourceFlatParquetDataFilterBenchmark.
>>> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op
>>>
>>>
>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>>>   ss    5  0.179 ± 0.019   s/op
>>>
>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>>>     ss    5  0.189 ± 0.046   s/op
>>>
>>>
>>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>>>     ss    5  0.195 ± 0.137   s/op
>>>
>>>
>>> *Perf Notes*:
>>> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
>>> flat data scans. Notice how it's almost exactly same as vanilla spark
>>> vectorization.
>>> - Projections work equally well. Although we see Nested column
>>> projections are still not performing as well as we need to be able to push
>>> nested column projections down to Iceberg.
>>> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
>>> workloads, but this goes away with larger data files.
>>>
>>> *Why we think this is useful*:
>>> - This approach allows users to benefit from both: 1) Iceberg's metadata
>>> filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
>>> adoption.
>>> - We think this can be an interim solution (until Arrow based impl is
>>> fully performant) for those who are currently blocked by performance
>>> difference between Iceberg and Spark's native Vectorization for interactive
>>> usecases. There's a lot of optimization work and testing gone into V1
>>> vectorization that Iceberg can now benefit from.
>>> - In many cases companies have proprietary implementations of
>>> *ParquetFileFormat* that could have extended features like complex type
>>> support etc. Our code can use that at runtime as long as '
>>> *buildReaderWithPartitionValues()*'  signature is consistent.. if not
>>> the reader can be easily modified to plug their own vectorized reader in.
>>> - While profiling the Arrow implementation I found it difficult to
>>> compare bottlenecks due to major differences between DSv1 and DSv2
>>> client-to-source interface paths. This makes it easier to compare numbers
>>> and profile code between V1 vectorization and Arrow vectorization as we now
>>> have both paths working behind a single DataSourceV2 path (viz.
>>> IcebergSource).
>>>
>>> *Limitations*:
>>> - This implementation is specific to Spark so other compute frameworks
>>> like Presto won't benefit from this.
>>> - It doesn't use Iceberg's Value Reader interface as it bypasses
>>> everything under the Task Data Reading. (added a separate
>>> *V1VectorizedTaskDataReader*)
>>> - Need to maintain two readers, as adding any code to Reader.java might
>>> need changes to V1Vectorized Reader. Although, we could minimize this with
>>> a *ReaderUtils* class.
>>>
>>>
>>> I have the code checked in at
>>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
>>> If folks think this is useful and we can keep this as an interim solution
>>> behind a feature flag, I can get a PR up with proper unit tests.
>>>
>>> thanks and regards,
>>> -Gautam.
>>>
>>>
>>> [1] - https://github.com/apache/incubator-iceberg/issues/9
>>> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
>>> [3] -
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
>>> [4] -
>>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>>>
>>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Iceberg using V1 Vectorized Reader over Parquet ..

Posted by Gautam <ga...@gmail.com>.
I'v added unit tests and created a PR for the v1 vectorization work :
https://github.com/apache/incubator-iceberg/pull/452

I'm sure there's scope for further improvement so lemme know your feedback
over the PR so I can sharpen it further.

Cheers,
-Gautam.


On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee <mo...@gmail.com>
wrote:

> Hi Gautam, this is very exciting to see. It would be great if this was
> available behind a flag if possible.
>
> Best,
> Mouli
>
> On Wed, Sep 4, 2019, 7:01 AM Gautam <ga...@gmail.com> wrote:
>
>> Hello Devs,
>>                    As some of you know there's been ongoing work as part
>> of [1] to build Arrow based vectorization into Iceberg. There's a separate
>> thread on this dev list where that is being discussed and progress is being
>> tracked in a separate branch [2]. The overall approach there is to build a
>> ground up Arrow based implementation of vectorization within Iceberg so
>> that any compute engine using Iceberg would benefit from those
>> optimizations. We feel that is indeed the longer term solution and the best
>> way forward.
>>
>> Meanwhile, Xabriel & I took to investigating an interim approach where
>> Iceberg could use the current Vectorization code built into Spark Parquet
>> reading, which I will refer to as "*V1 Vectorization*". This is the code
>> path that Spark's DataSourceV1 readers use to read Parquet data. The result
>> is that we have performance parity between Iceberg and Spark's Vanilla
>> Parquet reader. We thought we should share this with the larger community
>> so others can benefit from this gain.
>>
>> *What we did *:
>> - Added a new reader viz. *V1VectorizedReader *that internally short
>> circuits to using the V1 codepath [3]  which does most of the setup and
>> work to perform vectorization. it's exactly what Vanilla Spark's reader
>> does underneath the DSV1 implementation.
>> - It builds an iterator which expects ColumnarBatches from the Objects
>> returned by the resolving iterator.
>> - We re-organized and optimized code while building *ReadTask *instances which
>> considerably improved task initiation and planning time.
>> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
>> reader in IcebergSource.
>> - The V1Vectorized reader is an independent class with copied code in
>> some methods as we didn't want to degrade perf due to inheritance/virtual
>> method calls (we noticed degradation when we did try to re-use code).
>> - I'v pushed this code to a separate branch [4] in case others want to
>> give this a try.
>>
>>
>> *The Numbers*:
>>
>>
>> Flat Data 10 files 10M rows each
>>
>>
>> Benchmark
>>             Mode  Cnt   Score   Error  Units
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>               ss    5  63.631 ± 1.300   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>               ss    5  28.322 ± 2.400   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>               ss    5  65.862 ± 2.480   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>>               ss    5  28.199 ± 1.255   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>>               ss    5  29.822 ± 2.848   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>>                 ss    5  27.953 ± 0.949   s/op
>>
>>
>>
>>
>> Flat Data Projections 10 files 10M rows each
>>
>>
>> Benchmark
>>             Mode  Cnt   Score   Error  Units
>>
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>   ss    5  11.307 ± 1.791   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.
>> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
>> s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>               ss    5  11.057 ± 0.236   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>>     ss    5   3.953 ± 1.592   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>>     ss    5   3.619 ± 1.305   s/op
>>
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>>     ss    5   4.109 ± 1.734   s/op
>>
>>
>> Filtered Data 500 files 10k rows each
>>
>>
>> Benchmark
>>           Mode  Cnt  Score   Error  Units
>>
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>>   ss    5  2.139 ± 0.719   s/op
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>>       ss    5  2.213 ± 0.598   s/op
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.
>> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op
>>
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>>   ss    5  0.179 ± 0.019   s/op
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>>     ss    5  0.189 ± 0.046   s/op
>>
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>>     ss    5  0.195 ± 0.137   s/op
>>
>>
>> *Perf Notes*:
>> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
>> flat data scans. Notice how it's almost exactly same as vanilla spark
>> vectorization.
>> - Projections work equally well. Although we see Nested column
>> projections are still not performing as well as we need to be able to push
>> nested column projections down to Iceberg.
>> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
>> workloads, but this goes away with larger data files.
>>
>> *Why we think this is useful*:
>> - This approach allows users to benefit from both: 1) Iceberg's metadata
>> filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
>> adoption.
>> - We think this can be an interim solution (until Arrow based impl is
>> fully performant) for those who are currently blocked by performance
>> difference between Iceberg and Spark's native Vectorization for interactive
>> usecases. There's a lot of optimization work and testing gone into V1
>> vectorization that Iceberg can now benefit from.
>> - In many cases companies have proprietary implementations of
>> *ParquetFileFormat* that could have extended features like complex type
>> support etc. Our code can use that at runtime as long as '
>> *buildReaderWithPartitionValues()*'  signature is consistent.. if not
>> the reader can be easily modified to plug their own vectorized reader in.
>> - While profiling the Arrow implementation I found it difficult to
>> compare bottlenecks due to major differences between DSv1 and DSv2
>> client-to-source interface paths. This makes it easier to compare numbers
>> and profile code between V1 vectorization and Arrow vectorization as we now
>> have both paths working behind a single DataSourceV2 path (viz.
>> IcebergSource).
>>
>> *Limitations*:
>> - This implementation is specific to Spark so other compute frameworks
>> like Presto won't benefit from this.
>> - It doesn't use Iceberg's Value Reader interface as it bypasses
>> everything under the Task Data Reading. (added a separate
>> *V1VectorizedTaskDataReader*)
>> - Need to maintain two readers, as adding any code to Reader.java might
>> need changes to V1Vectorized Reader. Although, we could minimize this with
>> a *ReaderUtils* class.
>>
>>
>> I have the code checked in at
>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
>> If folks think this is useful and we can keep this as an interim solution
>> behind a feature flag, I can get a PR up with proper unit tests.
>>
>> thanks and regards,
>> -Gautam.
>>
>>
>> [1] - https://github.com/apache/incubator-iceberg/issues/9
>> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
>> [3] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
>> [4] -
>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>>
>>

Re: Iceberg using V1 Vectorized Reader over Parquet ..

Posted by Mouli Mukherjee <mo...@gmail.com>.
Hi Gautam, this is very exciting to see. It would be great if this was
available behind a flag if possible.

Best,
Mouli

On Wed, Sep 4, 2019, 7:01 AM Gautam <ga...@gmail.com> wrote:

> Hello Devs,
>                    As some of you know there's been ongoing work as part
> of [1] to build Arrow based vectorization into Iceberg. There's a separate
> thread on this dev list where that is being discussed and progress is being
> tracked in a separate branch [2]. The overall approach there is to build a
> ground up Arrow based implementation of vectorization within Iceberg so
> that any compute engine using Iceberg would benefit from those
> optimizations. We feel that is indeed the longer term solution and the best
> way forward.
>
> Meanwhile, Xabriel & I took to investigating an interim approach where
> Iceberg could use the current Vectorization code built into Spark Parquet
> reading, which I will refer to as "*V1 Vectorization*". This is the code
> path that Spark's DataSourceV1 readers use to read Parquet data. The result
> is that we have performance parity between Iceberg and Spark's Vanilla
> Parquet reader. We thought we should share this with the larger community
> so others can benefit from this gain.
>
> *What we did *:
> - Added a new reader viz. *V1VectorizedReader *that internally short
> circuits to using the V1 codepath [3]  which does most of the setup and
> work to perform vectorization. it's exactly what Vanilla Spark's reader
> does underneath the DSV1 implementation.
> - It builds an iterator which expects ColumnarBatches from the Objects
> returned by the resolving iterator.
> - We re-organized and optimized code while building *ReadTask *instances which
> considerably improved task initiation and planning time.
> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
> reader in IcebergSource.
> - The V1Vectorized reader is an independent class with copied code in some
> methods as we didn't want to degrade perf due to inheritance/virtual method
> calls (we noticed degradation when we did try to re-use code).
> - I'v pushed this code to a separate branch [4] in case others want to
> give this a try.
>
>
> *The Numbers*:
>
>
> Flat Data 10 files 10M rows each
>
>
> Benchmark
>             Mode  Cnt   Score   Error  Units
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>               ss    5  63.631 ± 1.300   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>               ss    5  28.322 ± 2.400   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>               ss    5  65.862 ± 2.480   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>               ss    5  28.199 ± 1.255   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>               ss    5  29.822 ± 2.848   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>                 ss    5  27.953 ± 0.949   s/op
>
>
>
>
> Flat Data Projections 10 files 10M rows each
>
>
> Benchmark
>             Mode  Cnt   Score   Error  Units
>
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>   ss    5  11.307 ± 1.791   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.
> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>               ss    5  11.057 ± 0.236   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>     ss    5   3.953 ± 1.592   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>     ss    5   3.619 ± 1.305   s/op
>
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>     ss    5   4.109 ± 1.734   s/op
>
>
> Filtered Data 500 files 10k rows each
>
>
> Benchmark
>           Mode  Cnt  Score   Error  Units
>
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>   ss    5  2.139 ± 0.719   s/op
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>       ss    5  2.213 ± 0.598   s/op
>
> IcebergSourceFlatParquetDataFilterBenchmark.
> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op
>
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>   ss    5  0.179 ± 0.019   s/op
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>     ss    5  0.189 ± 0.046   s/op
>
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>     ss    5  0.195 ± 0.137   s/op
>
>
> *Perf Notes*:
> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
> flat data scans. Notice how it's almost exactly same as vanilla spark
> vectorization.
> - Projections work equally well. Although we see Nested column projections
> are still not performing as well as we need to be able to push nested
> column projections down to Iceberg.
> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
> workloads, but this goes away with larger data files.
>
> *Why we think this is useful*:
> - This approach allows users to benefit from both: 1) Iceberg's metadata
> filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
> adoption.
> - We think this can be an interim solution (until Arrow based impl is
> fully performant) for those who are currently blocked by performance
> difference between Iceberg and Spark's native Vectorization for interactive
> usecases. There's a lot of optimization work and testing gone into V1
> vectorization that Iceberg can now benefit from.
> - In many cases companies have proprietary implementations of
> *ParquetFileFormat* that could have extended features like complex type
> support etc. Our code can use that at runtime as long as '
> *buildReaderWithPartitionValues()*'  signature is consistent.. if not the
> reader can be easily modified to plug their own vectorized reader in.
> - While profiling the Arrow implementation I found it difficult to compare
> bottlenecks due to major differences between DSv1 and DSv2 client-to-source
> interface paths. This makes it easier to compare numbers and profile code
> between V1 vectorization and Arrow vectorization as we now have both paths
> working behind a single DataSourceV2 path (viz. IcebergSource).
>
> *Limitations*:
> - This implementation is specific to Spark so other compute frameworks
> like Presto won't benefit from this.
> - It doesn't use Iceberg's Value Reader interface as it bypasses
> everything under the Task Data Reading. (added a separate
> *V1VectorizedTaskDataReader*)
> - Need to maintain two readers, as adding any code to Reader.java might
> need changes to V1Vectorized Reader. Although, we could minimize this with
> a *ReaderUtils* class.
>
>
> I have the code checked in at
> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
> If folks think this is useful and we can keep this as an interim solution
> behind a feature flag, I can get a PR up with proper unit tests.
>
> thanks and regards,
> -Gautam.
>
>
> [1] - https://github.com/apache/incubator-iceberg/issues/9
> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
> [3] -
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
> [4] -
> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>
>