You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Manu Zhang <ow...@gmail.com> on 2022/09/07 06:06:24 UTC

Spark driver memory issue when scanning all columns of a iceberg table

Hi all,

It looks scanning all columns of an iceberg table in Spark could cause
memory issue in the driver by keeping all the stats.

*select * from iceberg_table limit 10;*

I also created https://github.com/apache/iceberg/issues/5706 with more
details.
Is there any reason not to drop stats
<https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
when columns contain ALL_COLUMNS(*)?

Thanks,
Manu

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Manu Zhang <ow...@gmail.com>.
Thanks Ryan, I've submitted a PR
<https://github.com/apache/iceberg/issues/5706> to fix it. Please help
review.

On Tue, Sep 20, 2022 at 2:20 AM Ryan Blue <bl...@tabular.io> wrote:

> I just did some digging and I think that the reason for checking whether
> the row filter is always true is to match the logic when we add stats
> columns. We only add stats columns if the row filter is non-trivial (not
> always true) so we probably carried that over when removing stats columns.
> If tests are passing without this, then I think it would be safe to drop
> that requirement.
>
> Ryan
>
> On Sun, Sep 18, 2022 at 6:33 PM Manu Zhang <ow...@gmail.com>
> wrote:
>
>> Sorry for a typo in the previous reply.
>>
>> The questions is why we don't drop stats when
>> *`rowFilter == Expressions.alwaysTrue()`*
>>
>> On Mon, Sep 19, 2022 at 9:30 AM Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Does anyone know why we don't drop stats when `rowFilter != Expressions.
>>> alwaysTrue()` at
>>> https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L326
>>> ?
>>> I tried removing it but all tests passed.
>>>
>>> Thanks,
>>> Manu
>>>
>>>
>>>
>>> On Sat, Sep 10, 2022 at 8:56 PM Manu Zhang <ow...@gmail.com>
>>> wrote:
>>>
>>>> Ryan,
>>>>
>>>> I found data files did a full copy (deep copy) of all stats from the
>>>> manifest file when rowFilter is true. With a large number of data files, so
>>>> much memory could be taken up by stats like valueCounts.
>>>> I also attached snapshots of the heap dump in the GitHub issue
>>>> comments. Please help confirm.
>>>>
>>>> Thanks,
>>>> Manu
>>>>
>>>> On Fri, Sep 9, 2022 at 7:09 AM Manu Zhang <ow...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Ryan for explanation. Yes, I got it wrong and it’s manifest
>>>>> columns rather than data columns. I’ll try your suggestions and get back.
>>>>>
>>>>> Manu
>>>>>
>>>>> Ryan Blue <bl...@tabular.io>于2022年9月8日 周四03:39写道:
>>>>>
>>>>>> Manu,
>>>>>>
>>>>>> The check that you linked to where stats aren’t dropped is when
>>>>>> someone is asking for all columns from a manifest file, not when your data
>>>>>> query is requesting all columns. In the case of your query, Spark is not
>>>>>> asking for stats columns. They will be used for filtering, but will be
>>>>>> dropped before passing the DataFile to the scan as a matching result
>>>>>> file.
>>>>>>
>>>>>> I’ll post a more detailed reply on the issue, but when we’ve seen
>>>>>> this issue in the past the problem is usually that your planning
>>>>>> parallelism is high (based on the environment) and the parallel planning is
>>>>>> adding them to a queue. You can avoid that by setting
>>>>>> iceberg.worker.num-threads=2 (or something small) or disabling
>>>>>> parallel planning by setting iceberg.scan.plan-in-worker-pool=false.
>>>>>> Both of those are Java system properties.
>>>>>>
>>>>>> Ryan
>>>>>>
>>>>>> On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> It looks scanning all columns of an iceberg table in Spark could
>>>>>>> cause memory issue in the driver by keeping all the stats.
>>>>>>>
>>>>>>> *select * from iceberg_table limit 10;*
>>>>>>>
>>>>>>> I also created https://github.com/apache/iceberg/issues/5706 with
>>>>>>> more details.
>>>>>>> Is there any reason not to drop stats
>>>>>>> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
>>>>>>> when columns contain ALL_COLUMNS(*)?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manu
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Tabular
>>>>>>
>>>>>
>
> --
> Ryan Blue
> Tabular
>

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Ryan Blue <bl...@tabular.io>.
I just did some digging and I think that the reason for checking whether
the row filter is always true is to match the logic when we add stats
columns. We only add stats columns if the row filter is non-trivial (not
always true) so we probably carried that over when removing stats columns.
If tests are passing without this, then I think it would be safe to drop
that requirement.

Ryan

On Sun, Sep 18, 2022 at 6:33 PM Manu Zhang <ow...@gmail.com> wrote:

> Sorry for a typo in the previous reply.
>
> The questions is why we don't drop stats when
> *`rowFilter == Expressions.alwaysTrue()`*
>
> On Mon, Sep 19, 2022 at 9:30 AM Manu Zhang <ow...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> Does anyone know why we don't drop stats when `rowFilter != Expressions.
>> alwaysTrue()` at
>> https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L326
>> ?
>> I tried removing it but all tests passed.
>>
>> Thanks,
>> Manu
>>
>>
>>
>> On Sat, Sep 10, 2022 at 8:56 PM Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>>> Ryan,
>>>
>>> I found data files did a full copy (deep copy) of all stats from the
>>> manifest file when rowFilter is true. With a large number of data files, so
>>> much memory could be taken up by stats like valueCounts.
>>> I also attached snapshots of the heap dump in the GitHub issue comments.
>>> Please help confirm.
>>>
>>> Thanks,
>>> Manu
>>>
>>> On Fri, Sep 9, 2022 at 7:09 AM Manu Zhang <ow...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Ryan for explanation. Yes, I got it wrong and it’s manifest
>>>> columns rather than data columns. I’ll try your suggestions and get back.
>>>>
>>>> Manu
>>>>
>>>> Ryan Blue <bl...@tabular.io>于2022年9月8日 周四03:39写道:
>>>>
>>>>> Manu,
>>>>>
>>>>> The check that you linked to where stats aren’t dropped is when
>>>>> someone is asking for all columns from a manifest file, not when your data
>>>>> query is requesting all columns. In the case of your query, Spark is not
>>>>> asking for stats columns. They will be used for filtering, but will be
>>>>> dropped before passing the DataFile to the scan as a matching result
>>>>> file.
>>>>>
>>>>> I’ll post a more detailed reply on the issue, but when we’ve seen this
>>>>> issue in the past the problem is usually that your planning parallelism is
>>>>> high (based on the environment) and the parallel planning is adding them to
>>>>> a queue. You can avoid that by setting iceberg.worker.num-threads=2
>>>>> (or something small) or disabling parallel planning by setting
>>>>> iceberg.scan.plan-in-worker-pool=false. Both of those are Java system
>>>>> properties.
>>>>>
>>>>> Ryan
>>>>>
>>>>> On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> It looks scanning all columns of an iceberg table in Spark could
>>>>>> cause memory issue in the driver by keeping all the stats.
>>>>>>
>>>>>> *select * from iceberg_table limit 10;*
>>>>>>
>>>>>> I also created https://github.com/apache/iceberg/issues/5706 with
>>>>>> more details.
>>>>>> Is there any reason not to drop stats
>>>>>> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
>>>>>> when columns contain ALL_COLUMNS(*)?
>>>>>>
>>>>>> Thanks,
>>>>>> Manu
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Tabular
>>>>>
>>>>

-- 
Ryan Blue
Tabular

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Manu Zhang <ow...@gmail.com>.
Sorry for a typo in the previous reply.

The questions is why we don't drop stats when
*`rowFilter == Expressions.alwaysTrue()`*

On Mon, Sep 19, 2022 at 9:30 AM Manu Zhang <ow...@gmail.com> wrote:

> Hi all,
>
> Does anyone know why we don't drop stats when `rowFilter != Expressions.
> alwaysTrue()` at
> https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L326
> ?
> I tried removing it but all tests passed.
>
> Thanks,
> Manu
>
>
>
> On Sat, Sep 10, 2022 at 8:56 PM Manu Zhang <ow...@gmail.com>
> wrote:
>
>> Ryan,
>>
>> I found data files did a full copy (deep copy) of all stats from the
>> manifest file when rowFilter is true. With a large number of data files, so
>> much memory could be taken up by stats like valueCounts.
>> I also attached snapshots of the heap dump in the GitHub issue comments.
>> Please help confirm.
>>
>> Thanks,
>> Manu
>>
>> On Fri, Sep 9, 2022 at 7:09 AM Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>>> Thanks Ryan for explanation. Yes, I got it wrong and it’s manifest
>>> columns rather than data columns. I’ll try your suggestions and get back.
>>>
>>> Manu
>>>
>>> Ryan Blue <bl...@tabular.io>于2022年9月8日 周四03:39写道:
>>>
>>>> Manu,
>>>>
>>>> The check that you linked to where stats aren’t dropped is when someone
>>>> is asking for all columns from a manifest file, not when your data query is
>>>> requesting all columns. In the case of your query, Spark is not asking for
>>>> stats columns. They will be used for filtering, but will be dropped before
>>>> passing the DataFile to the scan as a matching result file.
>>>>
>>>> I’ll post a more detailed reply on the issue, but when we’ve seen this
>>>> issue in the past the problem is usually that your planning parallelism is
>>>> high (based on the environment) and the parallel planning is adding them to
>>>> a queue. You can avoid that by setting iceberg.worker.num-threads=2
>>>> (or something small) or disabling parallel planning by setting
>>>> iceberg.scan.plan-in-worker-pool=false. Both of those are Java system
>>>> properties.
>>>>
>>>> Ryan
>>>>
>>>> On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> It looks scanning all columns of an iceberg table in Spark could cause
>>>>> memory issue in the driver by keeping all the stats.
>>>>>
>>>>> *select * from iceberg_table limit 10;*
>>>>>
>>>>> I also created https://github.com/apache/iceberg/issues/5706 with
>>>>> more details.
>>>>> Is there any reason not to drop stats
>>>>> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
>>>>> when columns contain ALL_COLUMNS(*)?
>>>>>
>>>>> Thanks,
>>>>> Manu
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Tabular
>>>>
>>>

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Manu Zhang <ow...@gmail.com>.
Hi all,

Does anyone know why we don't drop stats when `rowFilter != Expressions.
alwaysTrue()` at
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L326
?
I tried removing it but all tests passed.

Thanks,
Manu



On Sat, Sep 10, 2022 at 8:56 PM Manu Zhang <ow...@gmail.com> wrote:

> Ryan,
>
> I found data files did a full copy (deep copy) of all stats from the
> manifest file when rowFilter is true. With a large number of data files, so
> much memory could be taken up by stats like valueCounts.
> I also attached snapshots of the heap dump in the GitHub issue comments.
> Please help confirm.
>
> Thanks,
> Manu
>
> On Fri, Sep 9, 2022 at 7:09 AM Manu Zhang <ow...@gmail.com> wrote:
>
>> Thanks Ryan for explanation. Yes, I got it wrong and it’s manifest
>> columns rather than data columns. I’ll try your suggestions and get back.
>>
>> Manu
>>
>> Ryan Blue <bl...@tabular.io>于2022年9月8日 周四03:39写道:
>>
>>> Manu,
>>>
>>> The check that you linked to where stats aren’t dropped is when someone
>>> is asking for all columns from a manifest file, not when your data query is
>>> requesting all columns. In the case of your query, Spark is not asking for
>>> stats columns. They will be used for filtering, but will be dropped before
>>> passing the DataFile to the scan as a matching result file.
>>>
>>> I’ll post a more detailed reply on the issue, but when we’ve seen this
>>> issue in the past the problem is usually that your planning parallelism is
>>> high (based on the environment) and the parallel planning is adding them to
>>> a queue. You can avoid that by setting iceberg.worker.num-threads=2 (or
>>> something small) or disabling parallel planning by setting
>>> iceberg.scan.plan-in-worker-pool=false. Both of those are Java system
>>> properties.
>>>
>>> Ryan
>>>
>>> On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> It looks scanning all columns of an iceberg table in Spark could cause
>>>> memory issue in the driver by keeping all the stats.
>>>>
>>>> *select * from iceberg_table limit 10;*
>>>>
>>>> I also created https://github.com/apache/iceberg/issues/5706 with more
>>>> details.
>>>> Is there any reason not to drop stats
>>>> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
>>>> when columns contain ALL_COLUMNS(*)?
>>>>
>>>> Thanks,
>>>> Manu
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Tabular
>>>
>>

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Manu Zhang <ow...@gmail.com>.
Ryan,

I found data files did a full copy (deep copy) of all stats from the
manifest file when rowFilter is true. With a large number of data files, so
much memory could be taken up by stats like valueCounts.
I also attached snapshots of the heap dump in the GitHub issue comments.
Please help confirm.

Thanks,
Manu

On Fri, Sep 9, 2022 at 7:09 AM Manu Zhang <ow...@gmail.com> wrote:

> Thanks Ryan for explanation. Yes, I got it wrong and it’s manifest columns
> rather than data columns. I’ll try your suggestions and get back.
>
> Manu
>
> Ryan Blue <bl...@tabular.io>于2022年9月8日 周四03:39写道:
>
>> Manu,
>>
>> The check that you linked to where stats aren’t dropped is when someone
>> is asking for all columns from a manifest file, not when your data query is
>> requesting all columns. In the case of your query, Spark is not asking for
>> stats columns. They will be used for filtering, but will be dropped before
>> passing the DataFile to the scan as a matching result file.
>>
>> I’ll post a more detailed reply on the issue, but when we’ve seen this
>> issue in the past the problem is usually that your planning parallelism is
>> high (based on the environment) and the parallel planning is adding them to
>> a queue. You can avoid that by setting iceberg.worker.num-threads=2 (or
>> something small) or disabling parallel planning by setting
>> iceberg.scan.plan-in-worker-pool=false. Both of those are Java system
>> properties.
>>
>> Ryan
>>
>> On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> It looks scanning all columns of an iceberg table in Spark could cause
>>> memory issue in the driver by keeping all the stats.
>>>
>>> *select * from iceberg_table limit 10;*
>>>
>>> I also created https://github.com/apache/iceberg/issues/5706 with more
>>> details.
>>> Is there any reason not to drop stats
>>> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
>>> when columns contain ALL_COLUMNS(*)?
>>>
>>> Thanks,
>>> Manu
>>>
>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Manu Zhang <ow...@gmail.com>.
Thanks Ryan for explanation. Yes, I got it wrong and it’s manifest columns
rather than data columns. I’ll try your suggestions and get back.

Manu

Ryan Blue <bl...@tabular.io>于2022年9月8日 周四03:39写道:

> Manu,
>
> The check that you linked to where stats aren’t dropped is when someone is
> asking for all columns from a manifest file, not when your data query is
> requesting all columns. In the case of your query, Spark is not asking for
> stats columns. They will be used for filtering, but will be dropped before
> passing the DataFile to the scan as a matching result file.
>
> I’ll post a more detailed reply on the issue, but when we’ve seen this
> issue in the past the problem is usually that your planning parallelism is
> high (based on the environment) and the parallel planning is adding them to
> a queue. You can avoid that by setting iceberg.worker.num-threads=2 (or
> something small) or disabling parallel planning by setting
> iceberg.scan.plan-in-worker-pool=false. Both of those are Java system
> properties.
>
> Ryan
>
> On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> It looks scanning all columns of an iceberg table in Spark could cause
>> memory issue in the driver by keeping all the stats.
>>
>> *select * from iceberg_table limit 10;*
>>
>> I also created https://github.com/apache/iceberg/issues/5706 with more
>> details.
>> Is there any reason not to drop stats
>> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
>> when columns contain ALL_COLUMNS(*)?
>>
>> Thanks,
>> Manu
>>
>
>
> --
> Ryan Blue
> Tabular
>

Re: Spark driver memory issue when scanning all columns of a iceberg table

Posted by Ryan Blue <bl...@tabular.io>.
Manu,

The check that you linked to where stats aren’t dropped is when someone is
asking for all columns from a manifest file, not when your data query is
requesting all columns. In the case of your query, Spark is not asking for
stats columns. They will be used for filtering, but will be dropped before
passing the DataFile to the scan as a matching result file.

I’ll post a more detailed reply on the issue, but when we’ve seen this
issue in the past the problem is usually that your planning parallelism is
high (based on the environment) and the parallel planning is adding them to
a queue. You can avoid that by setting iceberg.worker.num-threads=2 (or
something small) or disabling parallel planning by setting
iceberg.scan.plan-in-worker-pool=false. Both of those are Java system
properties.

Ryan

On Tue, Sep 6, 2022 at 11:06 PM Manu Zhang <ow...@gmail.com> wrote:

> Hi all,
>
> It looks scanning all columns of an iceberg table in Spark could cause
> memory issue in the driver by keeping all the stats.
>
> *select * from iceberg_table limit 10;*
>
> I also created https://github.com/apache/iceberg/issues/5706 with more
> details.
> Is there any reason not to drop stats
> <https://github.com/apache/iceberg/blob/apache-iceberg-0.13.1/core/src/main/java/org/apache/iceberg/ManifestReader.java#L292>
> when columns contain ALL_COLUMNS(*)?
>
> Thanks,
> Manu
>


-- 
Ryan Blue
Tabular