You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jack Kolokasis <ko...@ics.forth.gr> on 2020/11/09 20:28:47 UTC

Caching Mechanism in Flink

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching 
mechanism to store intermediate results in memory for machine learning 
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos


Re: Caching Mechanism in Flink

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Iacovos,

As Matthias mentioned tasks' off-heap has nothing to do with the memory
segments. This memory component is reserved only for the user code.

The memory segments are managed by Flink and used for batch workloads, like
in memory joins etc.
They are part of managed memory (taskmanager.memory.managed.size)
which is also off-heap but not tasks' off-heap
(taskmanager.memory.task.off-heap.size) and not JVM direct memory.

The memory segments are also used to wrap network buffers. Those are JVM
direct memory (which is also off-heap) but again it is not about the tasks'
off-heap.

Maybe, the confusion comes from the fact that 'off-heap' generally refers
to everything which is not JVM Heap: direct or native memory.
The tasks' off-heap is that part of general 'off-heap' (direct memory limit
to be precise) which is reserved only for the user code but not intended to
be used by Flink.

Best,
Andrey

On Wed, Nov 11, 2020 at 3:06 PM Jack Kolokasis <ko...@ics.forth.gr>
wrote:

> Hi Matthias,
>
> Yeap, I am refer to the tasks' off-heap configuration value.
>
> Best,
> Iacovos
> On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
>
> When talking about the "off-heap" in your most recent message, are you
> still referring to the task's off-heap configuration value?
>
> AFAIK, the HybridMemorySegment shouldn't be directly related to the
> off-heap parameter.
>
> The HybridMemorySegment can be used as a wrapper around any kind of
> memory, i.e. byte[]. It can be either used for heap memory but also
> DirectByteBuffers (located in JVM's direct memory pool which is not part of
> the JVM's heap) or memory allocated through Unsafe's allocation methods
> (so-called native memory which is also not part of the JVM's heap).
> The HybridMemorySegments are utilized within the MemoryManager class. The
> MemoryManager instances are responsible for maintaining the managed memory
> used in each of the TaskSlots. Managed Memory is used in different settings
> (e.g. for the RocksDB state backend in streaming applications). It can be
> configured using taskmanager.memory.managed.size (or the corresponding
> *.fraction parameter) [1]. See more details on that in [2].
>
> I'm going to pull in Andrey as he has worked on that topic recently.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory
>
> On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis <ko...@ics.forth.gr>
> wrote:
>
>> Hi Matthias,
>>
>> Thank you for your reply and useful information. I find that the off-heap
>> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
>> when to use these HybridMemorySegments and in which operations this is
>> happened?
>>
>> Best,
>> Iacovos
>> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>>
>> Hi Iacovos,
>> The task's off-heap configuration value is used when spinning up
>> TaskManager containers in a clustered environment. It will contribute to
>> the overall memory reserved for a TaskManager container during deployment.
>> This parameter can be used to influence the amount of memory allocated if
>> the user code relies on DirectByteBuffers and/or native memory allocation.
>> There is no active memory pool management beyond that from Flink's side.
>> The configuration parameter is ignored if you run a Flink cluster locally.
>>
>> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
>> network buffers) and native memory (through Flink's internally used managed
>> memory) internally.
>>
>> You can find a more detailed description of Flink's memory model in [1].
>> I hope that helps.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>>
>> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <ko...@ics.forth.gr>
>> wrote:
>>
>>> Thank you Xuannan for the reply.
>>>
>>> Also I want to ask about how Flink uses the off-heap memory. If I set
>>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>>> off-heap? This is handle by the programmer?
>>>
>>> Best,
>>> Iacovos
>>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>>
>>> Hi Jack,
>>>
>>> At the moment, Flink doesn't support caching the intermediate result.
>>> However, there is some ongoing effort to support caching in Flink.
>>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>>> is planned for 1.13.
>>>
>>> Best,
>>> Xuannan
>>>
>>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <ko...@ics.forth.gr>,
>>> wrote:
>>>
>>> Hello all,
>>>
>>> I am new to Flink and I want to ask if the Flink supports a caching
>>> mechanism to store intermediate results in memory for machine learning
>>> workloads.
>>>
>>> If yes, how can I enable it and how can I use it?
>>>
>>> Thank you,
>>> Iacovos
>>>
>>>

Re: Caching Mechanism in Flink

Posted by Jack Kolokasis <ko...@ics.forth.gr>.
Hi Matthias,

Yeap, I am refer to the tasks' off-heap configuration value.

Best,
Iacovos

On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
> When talking about the "off-heap" in your most recent message, are you 
> still referring to the task's off-heap configuration value?
> AFAIK, the HybridMemorySegment shouldn't be directly related to the 
> off-heap parameter.
>
> The HybridMemorySegment can be used as a wrapper around any kind of 
> memory, i.e. byte[]. It can be either used for heap memory but also 
> DirectByteBuffers (located in JVM's direct memory pool which is not 
> part of the JVM's heap) or memory allocated through 
> Unsafe's allocation methods (so-called native memory which is also not 
> part of the JVM's heap).
> The HybridMemorySegments are utilized within the MemoryManager class. 
> The MemoryManager instances are responsible for maintaining the 
> managed memory used in each of the TaskSlots. Managed Memory is used 
> in different settings (e.g. for the RocksDB state backend in streaming 
> applications). It can be configured using 
> taskmanager.memory.managed.size (or the corresponding *.fraction 
> parameter) [1]. See more details on that in [2].
>
> I'm going to pull in Andrey as he has worked on that topic recently.
>
> Best,
> Matthias
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory
>
> On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
> <kolokasis@ics.forth.gr <ma...@ics.forth.gr>> wrote:
>
>     Hi Matthias,
>
>     Thank you for your reply and useful information. I find that the
>     off-heap is used when Flink uses HybridMemorySegments. Well, how
>     the Flink knows when to use these HybridMemorySegments and in
>     which operations this is happened?
>
>     Best,
>     Iacovos
>
>     On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>>     Hi Iacovos,
>>     The task's off-heap configuration value is used when spinning up
>>     TaskManager containers in a clustered environment. It will
>>     contribute to the overall memory reserved for a TaskManager
>>     container during deployment. This parameter can be used to
>>     influence the amount of memory allocated if the user code relies
>>     on DirectByteBuffers and/or native memory allocation. There is no
>>     active memory pool management beyond that from Flink's side. The
>>     configuration parameter is ignored if you run a Flink cluster
>>     locally.
>>
>>     Besides this, Flink also utilizes the JVM's using
>>     DirectByteBuffers (for network buffers) and native memory
>>     (through Flink's internally used managed memory) internally.
>>
>>     You can find a more detailed description of Flink's memory model
>>     in [1]. I hope that helps.
>>
>>     Best,
>>     Matthias
>>
>>     [1]
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>>
>>     On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis
>>     <kolokasis@ics.forth.gr <ma...@ics.forth.gr>> wrote:
>>
>>         Thank you Xuannan for the reply.
>>
>>         Also I want to ask about how Flink uses the off-heap memory.
>>         If I set taskmanager.memory.task.off-heap.size then which
>>         data does Flink allocate off-heap? This is handle by the
>>         programmer?
>>
>>         Best,
>>         Iacovos
>>
>>         On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>>         Hi Jack,
>>>
>>>         At the moment, Flink doesn't support caching the
>>>         intermediate result. However, there is some ongoing effort
>>>         to support caching in Flink.
>>>         FLIP-36[1] propose to add the caching mechanism at the Table
>>>         API. And it is planned for 1.13.
>>>
>>>         Best,
>>>         Xuannan
>>>
>>>         On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
>>>         <kolokasis@ics.forth.gr <ma...@ics.forth.gr>>, wrote:
>>>>         Hello all,
>>>>
>>>>         I am new to Flink and I want to ask if the Flink supports a
>>>>         caching
>>>>         mechanism to store intermediate results in memory for
>>>>         machine learning
>>>>         workloads.
>>>>
>>>>         If yes, how can I enable it and how can I use it?
>>>>
>>>>         Thank you,
>>>>         Iacovos
>>

Re: Caching Mechanism in Flink

Posted by Matthias Pohl <ma...@ververica.com>.
When talking about the "off-heap" in your most recent message, are you
still referring to the task's off-heap configuration value? AFAIK,
the HybridMemorySegment shouldn't be directly related to the off-heap
parameter.

The HybridMemorySegment can be used as a wrapper around any kind of
memory, i.e. byte[]. It can be either used for heap memory but also
DirectByteBuffers (located in JVM's direct memory pool which is not part of
the JVM's heap) or memory allocated through Unsafe's allocation methods
(so-called native memory which is also not part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. The
MemoryManager instances are responsible for maintaining the managed memory
used in each of the TaskSlots. Managed Memory is used in different settings
(e.g. for the RocksDB state backend in streaming applications). It can be
configured using taskmanager.memory.managed.size (or the corresponding
*.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory

On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis <ko...@ics.forth.gr>
wrote:

> Hi Matthias,
>
> Thank you for your reply and useful information. I find that the off-heap
> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
> when to use these HybridMemorySegments and in which operations this is
> happened?
>
> Best,
> Iacovos
> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>
> Hi Iacovos,
> The task's off-heap configuration value is used when spinning up
> TaskManager containers in a clustered environment. It will contribute to
> the overall memory reserved for a TaskManager container during deployment.
> This parameter can be used to influence the amount of memory allocated if
> the user code relies on DirectByteBuffers and/or native memory allocation.
> There is no active memory pool management beyond that from Flink's side.
> The configuration parameter is ignored if you run a Flink cluster locally.
>
> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
> network buffers) and native memory (through Flink's internally used managed
> memory) internally.
>
> You can find a more detailed description of Flink's memory model in [1]. I
> hope that helps.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>
> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <ko...@ics.forth.gr>
> wrote:
>
>> Thank you Xuannan for the reply.
>>
>> Also I want to ask about how Flink uses the off-heap memory. If I set
>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>> off-heap? This is handle by the programmer?
>>
>> Best,
>> Iacovos
>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>
>> Hi Jack,
>>
>> At the moment, Flink doesn't support caching the intermediate result.
>> However, there is some ongoing effort to support caching in Flink.
>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>> is planned for 1.13.
>>
>> Best,
>> Xuannan
>>
>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <ko...@ics.forth.gr>,
>> wrote:
>>
>> Hello all,
>>
>> I am new to Flink and I want to ask if the Flink supports a caching
>> mechanism to store intermediate results in memory for machine learning
>> workloads.
>>
>> If yes, how can I enable it and how can I use it?
>>
>> Thank you,
>> Iacovos
>>
>>

Re: Caching Mechanism in Flink

Posted by Jack Kolokasis <ko...@ics.forth.gr>.
Hi Matthias,

Thank you for your reply and useful information. I find that the 
off-heap is used when Flink uses HybridMemorySegments. Well, how the 
Flink knows when to use these HybridMemorySegments and in which 
operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
> Hi Iacovos,
> The task's off-heap configuration value is used when spinning up 
> TaskManager containers in a clustered environment. It will contribute 
> to the overall memory reserved for a TaskManager container during 
> deployment. This parameter can be used to influence the amount of 
> memory allocated if the user code relies on DirectByteBuffers and/or 
> native memory allocation. There is no active memory pool management 
> beyond that from Flink's side. The configuration parameter is ignored 
> if you run a Flink cluster locally.
>
> Besides this, Flink also utilizes the JVM's using DirectByteBuffers 
> (for network buffers) and native memory (through Flink's internally 
> used managed memory) internally.
>
> You can find a more detailed description of Flink's memory model in 
> [1]. I hope that helps.
>
> Best,
> Matthias
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>
> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <kolokasis@ics.forth.gr 
> <ma...@ics.forth.gr>> wrote:
>
>     Thank you Xuannan for the reply.
>
>     Also I want to ask about how Flink uses the off-heap memory. If I
>     set taskmanager.memory.task.off-heap.size then which data does
>     Flink allocate off-heap? This is handle by the programmer?
>
>     Best,
>     Iacovos
>
>     On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>     Hi Jack,
>>
>>     At the moment, Flink doesn't support caching the intermediate
>>     result. However, there is some ongoing effort to support caching
>>     in Flink.
>>     FLIP-36[1] propose to add the caching mechanism at the Table API.
>>     And it is planned for 1.13.
>>
>>     Best,
>>     Xuannan
>>
>>     On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
>>     <kolokasis@ics.forth.gr <ma...@ics.forth.gr>>, wrote:
>>>     Hello all,
>>>
>>>     I am new to Flink and I want to ask if the Flink supports a caching
>>>     mechanism to store intermediate results in memory for machine
>>>     learning
>>>     workloads.
>>>
>>>     If yes, how can I enable it and how can I use it?
>>>
>>>     Thank you,
>>>     Iacovos
>

Re: Caching Mechanism in Flink

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will contribute to
the overall memory reserved for a TaskManager container during deployment.
This parameter can be used to influence the amount of memory allocated if
the user code relies on DirectByteBuffers and/or native memory allocation.
There is no active memory pool management beyond that from Flink's side.
The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
network buffers) and native memory (through Flink's internally used managed
memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I
hope that helps.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <ko...@ics.forth.gr>
wrote:

> Thank you Xuannan for the reply.
>
> Also I want to ask about how Flink uses the off-heap memory. If I set
> taskmanager.memory.task.off-heap.size then which data does Flink allocate
> off-heap? This is handle by the programmer?
>
> Best,
> Iacovos
> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>
> Hi Jack,
>
> At the moment, Flink doesn't support caching the intermediate result.
> However, there is some ongoing effort to support caching in Flink.
> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
> is planned for 1.13.
>
> Best,
> Xuannan
>
> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <ko...@ics.forth.gr>,
> wrote:
>
> Hello all,
>
> I am new to Flink and I want to ask if the Flink supports a caching
> mechanism to store intermediate results in memory for machine learning
> workloads.
>
> If yes, how can I enable it and how can I use it?
>
> Thank you,
> Iacovos
>
>

Re: Caching Mechanism in Flink

Posted by Jack Kolokasis <ko...@ics.forth.gr>.
Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set 
taskmanager.memory.task.off-heap.size then which data does Flink 
allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
> Hi Jack,
>
> At the moment, Flink doesn't support caching the intermediate result. 
> However, there is some ongoing effort to support caching in Flink.
> FLIP-36[1] propose to add the caching mechanism at the Table API. And 
> it is planned for 1.13.
>
> Best,
> Xuannan
>
> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <kolokasis@ics.forth.gr 
> <ma...@ics.forth.gr>>, wrote:
>> Hello all,
>>
>> I am new to Flink and I want to ask if the Flink supports a caching
>> mechanism to store intermediate results in memory for machine learning
>> workloads.
>>
>> If yes, how can I enable it and how can I use it?
>>
>> Thank you,
>> Iacovos
>>

Re: Caching Mechanism in Flink

Posted by Xuannan Su <su...@gmail.com>.
 Hi Jack,

At the moment, Flink doesn't support caching the intermediate result.
However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is
planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <ko...@ics.forth.gr>,
wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos