You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nishant Gupta <ni...@gmail.com> on 2019/09/26 14:14:23 UTC

Flink- Heap Space running out

 am running a query to join a stream and a table as below. It is running
out of heap space. Even though it has enough heap space in flink cluster
(60GB * 3)

Is there an eviction strategy needed for this query ?

*SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
sourceKafka.CC=DefaulterTable.CC;  *

Thanks

Nishant

Re: Flink- Heap Space running out

Posted by Nishant Gupta <ni...@gmail.com>.
Appoligies ****correction done to previous email****

Hi Fabian and Mike

*flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
hard disk ]*
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

*With Idle state retention having below configuration  (Same heap space
issue)  *
*execution:*
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128

*  min-idle-state-retention: 300000  max-idle-state-retention: 600000  *

*With time-windowed join (Records gets missed out and duplicated based on
the timeinterval I push badips)*
*SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE*

*I have tried Temporal functions - It is working fine*

I was really wishing to make it work with idle state and time window join.
Could you please check the configuration and query.
Please let me know if any other details are required

On Fri, Sep 27, 2019 at 12:41 PM Nishant Gupta <ni...@gmail.com>
wrote:

>
> Hi Fabian and Mike
>
> *flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
> hard disk ]*
> jobmanager.heap.size: 50120m
> taskmanager.heap.size: 50120m
>
> *With Idle state retention having below configuration  (Same heap space
> issue)  *
> *execution:*
>   planner: old
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 1000000
>   parallelism: 3
>   max-parallelism: 128
>
> *  min-idle-state-retention: 300000  max-idle-state-retention: 600000  *
>
> *With time-windowed join (Same heap space issue)*
> *SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
> AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
> K.k_proctime + INTERVAL '5' MINUTE*
>
> *I have tried Temporal functions - It is working fine*
>
> I was really wishing to make it work with idle state and time window join.
> Could you please check the configuration and query.
> Please let me know if any other details are required
>
>
> On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> I don' think that the memory configuration is the issue.
>> The problem is the join query. The join does not have any temporal
>> boundaries.
>> Therefore, both tables are completely stored in memory and never released.
>>
>> You can configure a memory eviction strategy via idle state retention [1]
>> but you should make sure that this is really what you want.
>> Alternatively, try a time-windowed join or a join with a temporal table
>> function.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>> Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <
>> miko5054@gmail.com>:
>>
>>> You can configure the task manager memory in the config.yaml file.
>>> What is the current configuration?
>>>
>>> On Thu, Sep 26, 2019, 17:14 Nishant Gupta <ni...@gmail.com>
>>> wrote:
>>>
>>>>  am running a query to join a stream and a table as below. It is
>>>> running out of heap space. Even though it has enough heap space in flink
>>>> cluster (60GB * 3)
>>>>
>>>> Is there an eviction strategy needed for this query ?
>>>>
>>>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>>>> sourceKafka.CC=DefaulterTable.CC;  *
>>>>
>>>> Thanks
>>>>
>>>> Nishant
>>>>
>>>

Re: Flink- Heap Space running out

Posted by Nishant Gupta <ni...@gmail.com>.
Hi Fabian and Mike

*flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
hard disk ]*
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

*With Idle state retention having below configuration  (Same heap space
issue)  *
*execution:*
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128

*  min-idle-state-retention: 300000  max-idle-state-retention: 600000  *

*With time-windowed join (Same heap space issue)*
*SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE*

*I have tried Temporal functions - It is working fine*

I was really wishing to make it work with idle state and time window join.
Could you please check the configuration and query.
Please let me know if any other details are required


On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> I don' think that the memory configuration is the issue.
> The problem is the join query. The join does not have any temporal
> boundaries.
> Therefore, both tables are completely stored in memory and never released.
>
> You can configure a memory eviction strategy via idle state retention [1]
> but you should make sure that this is really what you want.
> Alternatively, try a time-windowed join or a join with a temporal table
> function.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <miko5054@gmail.com
> >:
>
>> You can configure the task manager memory in the config.yaml file.
>> What is the current configuration?
>>
>> On Thu, Sep 26, 2019, 17:14 Nishant Gupta <ni...@gmail.com>
>> wrote:
>>
>>>  am running a query to join a stream and a table as below. It is running
>>> out of heap space. Even though it has enough heap space in flink cluster
>>> (60GB * 3)
>>>
>>> Is there an eviction strategy needed for this query ?
>>>
>>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>>> sourceKafka.CC=DefaulterTable.CC;  *
>>>
>>> Thanks
>>>
>>> Nishant
>>>
>>

Re: Flink- Heap Space running out

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I don' think that the memory configuration is the issue.
The problem is the join query. The join does not have any temporal
boundaries.
Therefore, both tables are completely stored in memory and never released.

You can configure a memory eviction strategy via idle state retention [1]
but you should make sure that this is really what you want.
Alternatively, try a time-windowed join or a join with a temporal table
function.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <mi...@gmail.com>:

> You can configure the task manager memory in the config.yaml file.
> What is the current configuration?
>
> On Thu, Sep 26, 2019, 17:14 Nishant Gupta <ni...@gmail.com>
> wrote:
>
>>  am running a query to join a stream and a table as below. It is running
>> out of heap space. Even though it has enough heap space in flink cluster
>> (60GB * 3)
>>
>> Is there an eviction strategy needed for this query ?
>>
>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>> sourceKafka.CC=DefaulterTable.CC;  *
>>
>> Thanks
>>
>> Nishant
>>
>

Re: Flink- Heap Space running out

Posted by miki haiat <mi...@gmail.com>.
You can configure the task manager memory in the config.yaml file.
What is the current configuration?

On Thu, Sep 26, 2019, 17:14 Nishant Gupta <ni...@gmail.com>
wrote:

>  am running a query to join a stream and a table as below. It is running
> out of heap space. Even though it has enough heap space in flink cluster
> (60GB * 3)
>
> Is there an eviction strategy needed for this query ?
>
> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
> sourceKafka.CC=DefaulterTable.CC;  *
>
> Thanks
>
> Nishant
>