You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by liu ze <li...@gmail.com> on 2019/09/04 11:30:52 UTC

flink sql row_number() over () OOM

hi,

I use the row_number() over() function to do topN, the total amount of data
is 60,000, and the state is 12G .
Finally, oom, is there any way to optimize it?

thanks

Re: flink sql row_number() over () OOM

Posted by liu ze <li...@gmail.com>.
Thank you for your reply.
This is my sql, self-join, calculate the proportion and then get top10
cumstors。
 "mytable" has only 60,000 records, after joining, the "records send" is
2,869,940 records, and is still increasing.

select * from (
select
t1.id,
    t1.month,
    t1.customer,
    t1.amount,
    t1.counts,
    t1.counts/t2.counts as countRate,
    t1.amount/t2.amount as amountRate,
    row_number() over(partition by t1.corpId, t1.month order by t1.amount
desc, t1.customer) as rn
from
(SELECT
 id,
 month,
 customer,
 sum(amount) AS amount,
 sum(counts) AS counts
  FROM
  mytable
  GROUP BY id,month,customer
)t1
 inner join
  (
   SELECT
     id,
     month,
     sum(amount) AS amount,
     sum(counts) AS counts
      FROM
   mytable
      WHERE
        GROUP BY id,month
  )t2
  on t1.id =  t2.id
  and t1.month = t2.month


  )t
where rn<=10
;

On Wed, Sep 4, 2019 at 7:48 PM Wesley Peng <we...@googlemail.com>
wrote:

> Hi
>
> on 2019/9/4 19:30, liu ze wrote:
> > I use the row_number() over() function to do topN, the total amount of
> > data is 60,000, and the state is 12G .
> > Finally, oom, is there any way to optimize it?
>
> ref:
>
> https://stackoverflow.com/questions/50812837/flink-taskmanager-out-of-memory-and-memory-configuration
>
> The total amount of required physical and heap memory is quite difficult
> to compute since it strongly depends on your user code, your job's
> topology and which state backend you use.
>
> As a rule of thumb, if you experience OOM and are still using the
> FileSystemStateBackend or the MemoryStateBackend, then you should switch
> to RocksDBStateBackend, because it can gracefully spill to disk if the
> state grows too big.
>
> If you are still experiencing OOM exceptions as you have described, then
> you should check your user code whether it keeps references to state
> objects or generates in some other way large objects which cannot be
> garbage collected. If this is the case, then you should try to refactor
> your code to rely on Flink's state abstraction, because with RocksDB it
> can go out of core.
>
> RocksDB itself needs native memory which adds to Flink's memory
> footprint. This depends on the block cache size, indexes, bloom filters
> and memtables. You can find out more about these things and how to
> configure them here.
>
> Last but not least, you should not activate
> taskmanager.memory.preallocate when running streaming jobs, because
> streaming jobs currently don't use managed memory. Thus, by activating
> preallocation, you would allocate memory for Flink's managed memory
> which is reduces the available heap space.
>

Re: flink sql row_number() over () OOM

Posted by Wesley Peng <we...@googlemail.com>.
Hi

on 2019/9/4 19:30, liu ze wrote:
> I use the row_number() over() function to do topN, the total amount of 
> data is 60,000, and the state is 12G .
> Finally, oom, is there any way to optimize it?

ref: 
https://stackoverflow.com/questions/50812837/flink-taskmanager-out-of-memory-and-memory-configuration

The total amount of required physical and heap memory is quite difficult 
to compute since it strongly depends on your user code, your job's 
topology and which state backend you use.

As a rule of thumb, if you experience OOM and are still using the 
FileSystemStateBackend or the MemoryStateBackend, then you should switch 
to RocksDBStateBackend, because it can gracefully spill to disk if the 
state grows too big.

If you are still experiencing OOM exceptions as you have described, then 
you should check your user code whether it keeps references to state 
objects or generates in some other way large objects which cannot be 
garbage collected. If this is the case, then you should try to refactor 
your code to rely on Flink's state abstraction, because with RocksDB it 
can go out of core.

RocksDB itself needs native memory which adds to Flink's memory 
footprint. This depends on the block cache size, indexes, bloom filters 
and memtables. You can find out more about these things and how to 
configure them here.

Last but not least, you should not activate 
taskmanager.memory.preallocate when running streaming jobs, because 
streaming jobs currently don't use managed memory. Thus, by activating 
preallocation, you would allocate memory for Flink's managed memory 
which is reduces the available heap space.