You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andrey Zagrebin (Jira)" <ji...@apache.org> on 2020/01/22 17:42:00 UTC

[jira] [Commented] (FLINK-14431) Update TaskManager's memory information to match its memory composition

    [ https://issues.apache.org/jira/browse/FLINK-14431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021315#comment-17021315 ] 

Andrey Zagrebin commented on FLINK-14431:
-----------------------------------------

[~lining] [~xintongsong]

What is the plan here? Can we update the UI to reflect FLIP-49 memory model better? e.g. as part of 1.10.1 minor release?

> Update TaskManager's memory information to match its memory composition
> -----------------------------------------------------------------------
>
>                 Key: FLINK-14431
>                 URL: https://issues.apache.org/jira/browse/FLINK-14431
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST, Runtime / Task, Runtime / Web Frontend
>            Reporter: lining
>            Priority: Major
>         Attachments: image-2019-10-17-17-58-50-342.png, image-2019-10-17-18-01-09-353.png, image-2019-10-17-18-29-53-329.png, image-2019-10-24-16-19-15-499.png, image-2019-10-24-16-20-23-210.png, image-2019-10-24-16-22-27-360.png, image-2019-12-19-18-09-05-542.png, image-2019-12-19-18-27-38-589.png, image-2019-12-19-18-28-01-447.png
>
>
> h3. Motivation
> There are several shortcomings of current (Flink 1.10) Flink TaskManager memory information show in rest api.
> h4. (1) The information from HardwareDescription  is difficult to match the memory compositions of TaskManager in flip-49. As below picture show:
> !image-2019-12-19-18-09-05-542.png|width=444,height=389!
>  * what's the meaning of HardwareDescription.sizeOfJvmHeap.
>  * the user couldn't get resource config about TaskManager.
> h4. (2) There isn't information for managed memory.
>  * no metric for managed memory.
> h4. (3) There isn't information for shuffle memory
>  * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle segment total size), the user couldn't get shuffle memory.
> h4. (4) The metrics in the TaskManager's metrics page do not correspond to the resource configuration of taskmanager
>  * It is difficult for users to update taskmanager's resource configuration based on metrics because users couldn’t find configuration items related to metrics.
> h3. Proposed Changes
> h4. Add TaskManageResourceInfo which match the memory compositions 
>  * information from TaskExecutorResourceSpec in flip-49, add it to TaskExecutorRegistration.
> {code:java}
> public class TaskManagerResourceInfo {
>     private final double cpuCores;
>     private final long frameworkHeap;
>     private final long frameworkOffHeap;
>     private final long taskHeap;
>     private final long taskOffHeap;
>     private final long shuffleMemory;
>     private final long managedMemory;
>     private final long jvmMetaSpace;
>     private final long jvmOverhead;
>     private final long totalProcessMemory;
> }
> {code}
>  * url: /taskmanagers/:taskmanagerid
>  * response: add
> {code:json}
> resource: {
>   cpuCores: 4,
>   frameworkHeap: 134217728,
>   frameworkOffHeap: 134217728,
>   taskHeap: 181193928,
>   taskOffHeap: 0,
>   shuffleMemory: 33554432,
>   managedMemory: 322122552,
>   jvmMetaSpace: 134217728,
>   jvmOverhead: 134217728,
>   totalProcessMemory: 1073741824
> }
> {code}
> h4. Add shuffle memory metric
>  * add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
> {code:java}
> public long getTotalMemorySize() {
>     return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
> }
> public long getAvaliableMemorySize() {
>     return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize;
> }{code}
>  * update NettyShuffleMetricFactory#registerShuffleMetrics
> {code:java}
> private static final String METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY = "TotalMemoryCapacity";
> private static final String METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY = "AvaliableMemory";
> private static void registerShuffleMetrics(
>     String groupName,
>     MetricGroup metricGroup,
>     NetworkBufferPool networkBufferPool) {
>     MetricGroup networkGroup = metricGroup.addGroup(groupName);
>     networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
>                                                 networkBufferPool::getTotalNumberOfMemorySegments);
>     networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
>                                                 networkBufferPool::getNumberOfAvailableMemorySegments);
>     networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY,
>                                           networkBufferPool::getTotalMemorySize);
>     networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY,
>                                           networkBufferPool::getAvaliableMemorySize);
> }
> {code}
> h4. Add manage memory metric
>  * add default memory type in MemoryManager
> {code:java}
> public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
> {code}
>  * add getManagedMemoryTotal in TaskExecutor:
> {code:java}
> public long getManagedMemoryTotal() {
>     return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
>         slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)
>     ).sum();
> }{code}
>  * add getManagedMemoryUsed in TaskExecutor:
> {code:java}
> public long getManagedMemoryUsed() {
>     return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
>         slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)      - slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
>     ).sum();
> }{code}
>  * add instantiateMemoryManagerMetrics in MetricUtils
> {code:java}
> public static void instantiateMemoryManagerMetrics(MetricGroup statusMetricGroup, TaskExecutor taskExecutor) {
>     checkNotNull(statusMetricGroup);
>     MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
>     memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal);
>     memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
> }{code}
>  * register it in TaskManagerRunner#startTaskManager 
> h4. Change the page of taskmanager's metric
>  * according to resource configuration in flip-49 and memory metric, as the below picture shows:
> !image-2019-12-19-18-28-01-447.png|width=671,height=282!
>  * Status.JVM.Memory.Heap.Used as the usage of Flink Heap
>  * Status.JVM.Memory.Direct.MemoryUsed - (shuffle total) as the usage of Flink offHeap
>  * shuffle used as the usage of shuffle
>  * managed used as the usage of shuffle
>  * Status.JVM.Memory.NonHeap.Used as the usage of overhead
> {code:json}
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)