You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2021/11/10 14:39:08 UTC

[GitHub] [incubator-yunikorn-core] kingamarton commented on a change in pull request #336: [YUNIKORN-940] Periodic & on-demand state dump in Yunikorn

kingamarton commented on a change in pull request #336:
URL: https://github.com/apache/incubator-yunikorn-core/pull/336#discussion_r746564854



##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {

Review comment:
       we have getPartition() method. Let's reuse that one.

##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
+	result := make([]*dao.PartitionInfo, 0, len(lists))
+
+	for _, partitionContext := range lists {
+		partitionInfo := &dao.PartitionInfo{}
+		partitionInfo.ClusterID = partitionContext.RmID
+		partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
+		partitionInfo.State = partitionContext.GetCurrentState()
+		partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().String()
+
+		capacityInfo := dao.PartitionCapacity{}
+		capacityInfo.Capacity = partitionContext.GetTotalPartitionResource().DAOString()
+		capacityInfo.UsedCapacity = partitionContext.GetAllocatedResource().DAOString()
+		partitionInfo.Capacity = capacityInfo
+		partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
+			Type:            partitionContext.GetNodeSortingPolicyType().String(),
+			ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
+		}
+
+		appList := partitionContext.GetApplications()
+		appList = append(appList, partitionContext.GetCompletedApplications()...)
+		applicationsState := make(map[string]int)
+		totalApplications := 0
+		for _, app := range appList {
+			applicationsState[app.CurrentState()]++
+			totalApplications++
+		}
+		applicationsState["total"] = totalApplications
+		partitionInfo.Applications = applicationsState
+		result = append(result, partitionInfo)
+	}
+
+	return result
+}
+
+func getAppHistoryDao(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
+	result := make([]*dao.ApplicationHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ApplicationHistoryDAOInfo{
+			Timestamp:         record.Timestamp.UnixNano(),
+			TotalApplications: strconv.Itoa(record.TotalApplications),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {

Review comment:
       Please reuse `func getPartitionNodes(w http.ResponseWriter, r *http.Request)` instead of defining a new one

##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
+	result := make([]*dao.PartitionInfo, 0, len(lists))
+
+	for _, partitionContext := range lists {
+		partitionInfo := &dao.PartitionInfo{}
+		partitionInfo.ClusterID = partitionContext.RmID
+		partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
+		partitionInfo.State = partitionContext.GetCurrentState()
+		partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().String()
+
+		capacityInfo := dao.PartitionCapacity{}
+		capacityInfo.Capacity = partitionContext.GetTotalPartitionResource().DAOString()
+		capacityInfo.UsedCapacity = partitionContext.GetAllocatedResource().DAOString()
+		partitionInfo.Capacity = capacityInfo
+		partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
+			Type:            partitionContext.GetNodeSortingPolicyType().String(),
+			ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
+		}
+
+		appList := partitionContext.GetApplications()
+		appList = append(appList, partitionContext.GetCompletedApplications()...)
+		applicationsState := make(map[string]int)
+		totalApplications := 0
+		for _, app := range appList {
+			applicationsState[app.CurrentState()]++
+			totalApplications++
+		}
+		applicationsState["total"] = totalApplications
+		partitionInfo.Applications = applicationsState
+		result = append(result, partitionInfo)
+	}
+
+	return result
+}
+
+func getAppHistoryDao(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
+	result := make([]*dao.ApplicationHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ApplicationHistoryDAOInfo{
+			Timestamp:         record.Timestamp.UnixNano(),
+			TotalApplications: strconv.Itoa(record.TotalApplications),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
+	result := make([]*dao.NodesDAOInfo, 0, len(lists))
+	for _, partition := range lists {
+		ns := partition.GetNodes()
+		nodesDao := make([]*dao.NodeDAOInfo, 0, len(ns))
+		for _, node := range ns {
+			nodeDao := getNodeJSON(node)
+			nodesDao = append(nodesDao, nodeDao)
+		}
+		result = append(result, &dao.NodesDAOInfo{
+			PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
+			Nodes:         nodesDao,
+		})
+	}
+
+	return result
+}
+
+func getContainerHistoryDao(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {

Review comment:
       PLease use `func getContainerHistory(w http.ResponseWriter, r *http.Request) {`

##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
+	result := make([]*dao.PartitionInfo, 0, len(lists))
+
+	for _, partitionContext := range lists {
+		partitionInfo := &dao.PartitionInfo{}
+		partitionInfo.ClusterID = partitionContext.RmID
+		partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
+		partitionInfo.State = partitionContext.GetCurrentState()
+		partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().String()
+
+		capacityInfo := dao.PartitionCapacity{}
+		capacityInfo.Capacity = partitionContext.GetTotalPartitionResource().DAOString()
+		capacityInfo.UsedCapacity = partitionContext.GetAllocatedResource().DAOString()
+		partitionInfo.Capacity = capacityInfo
+		partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
+			Type:            partitionContext.GetNodeSortingPolicyType().String(),
+			ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
+		}
+
+		appList := partitionContext.GetApplications()
+		appList = append(appList, partitionContext.GetCompletedApplications()...)
+		applicationsState := make(map[string]int)
+		totalApplications := 0
+		for _, app := range appList {
+			applicationsState[app.CurrentState()]++
+			totalApplications++
+		}
+		applicationsState["total"] = totalApplications
+		partitionInfo.Applications = applicationsState
+		result = append(result, partitionInfo)
+	}
+
+	return result
+}
+
+func getAppHistoryDao(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {

Review comment:
       Please reuse `func getApplicationHistory(w http.ResponseWriter, r *http.Request)`  method

##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
+	result := make([]*dao.PartitionInfo, 0, len(lists))
+
+	for _, partitionContext := range lists {
+		partitionInfo := &dao.PartitionInfo{}
+		partitionInfo.ClusterID = partitionContext.RmID
+		partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
+		partitionInfo.State = partitionContext.GetCurrentState()
+		partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().String()
+
+		capacityInfo := dao.PartitionCapacity{}
+		capacityInfo.Capacity = partitionContext.GetTotalPartitionResource().DAOString()
+		capacityInfo.UsedCapacity = partitionContext.GetAllocatedResource().DAOString()
+		partitionInfo.Capacity = capacityInfo
+		partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
+			Type:            partitionContext.GetNodeSortingPolicyType().String(),
+			ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
+		}
+
+		appList := partitionContext.GetApplications()
+		appList = append(appList, partitionContext.GetCompletedApplications()...)
+		applicationsState := make(map[string]int)
+		totalApplications := 0
+		for _, app := range appList {
+			applicationsState[app.CurrentState()]++
+			totalApplications++
+		}
+		applicationsState["total"] = totalApplications
+		partitionInfo.Applications = applicationsState
+		result = append(result, partitionInfo)
+	}
+
+	return result
+}
+
+func getAppHistoryDao(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
+	result := make([]*dao.ApplicationHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ApplicationHistoryDAOInfo{
+			Timestamp:         record.Timestamp.UnixNano(),
+			TotalApplications: strconv.Itoa(record.TotalApplications),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
+	result := make([]*dao.NodesDAOInfo, 0, len(lists))
+	for _, partition := range lists {
+		ns := partition.GetNodes()
+		nodesDao := make([]*dao.NodeDAOInfo, 0, len(ns))
+		for _, node := range ns {
+			nodeDao := getNodeJSON(node)
+			nodesDao = append(nodesDao, nodeDao)
+		}
+		result = append(result, &dao.NodesDAOInfo{
+			PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
+			Nodes:         nodesDao,
+		})
+	}
+
+	return result
+}
+
+func getContainerHistoryDao(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {
+	result := make([]*dao.ContainerHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ContainerHistoryDAOInfo{
+			Timestamp:       record.Timestamp.UnixNano(),
+			TotalContainers: strconv.Itoa(record.TotalContainers),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesUtilizationDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesUtilDAOInfo {
+	var result []*dao.NodesUtilDAOInfo
+
+	for _, partition := range lists {
+		partitionResource := partition.GetTotalPartitionResource()
+		// partitionResource can be null if the partition has no node
+		if partitionResource != nil {
+			for name := range partitionResource.Resources {
+				result = append(result, getNodesUtilJSON(partition, name))
+			}
+		}
+	}
+
+	return result
+}
+
+func getApplicationsDao(lists map[string]*scheduler.PartitionContext) []*dao.ApplicationDAOInfo {
+	result := make([]*dao.ApplicationDAOInfo, 0, 32)
+
+	for _, partition := range lists {
+		size := partition.GetTotalCompletedApplicationCount() + partition.GetTotalApplicationCount()
+		appList := make([]*objects.Application, size)
+		appList = append(appList, partition.GetApplications()...)
+		appList = append(appList, partition.GetCompletedApplications()...)
+
+		for _, app := range appList {
+			result = append(result, getApplicationJSON(app))
+		}
+	}
+
+	return result
+}
+
+func getPartitionDAOInfo(lists map[string]*scheduler.PartitionContext) []*dao.PartitionDAOInfo {

Review comment:
       Please reuse `func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo {`

##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
+	result := make([]*dao.PartitionInfo, 0, len(lists))
+
+	for _, partitionContext := range lists {
+		partitionInfo := &dao.PartitionInfo{}
+		partitionInfo.ClusterID = partitionContext.RmID
+		partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
+		partitionInfo.State = partitionContext.GetCurrentState()
+		partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().String()
+
+		capacityInfo := dao.PartitionCapacity{}
+		capacityInfo.Capacity = partitionContext.GetTotalPartitionResource().DAOString()
+		capacityInfo.UsedCapacity = partitionContext.GetAllocatedResource().DAOString()
+		partitionInfo.Capacity = capacityInfo
+		partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
+			Type:            partitionContext.GetNodeSortingPolicyType().String(),
+			ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
+		}
+
+		appList := partitionContext.GetApplications()
+		appList = append(appList, partitionContext.GetCompletedApplications()...)
+		applicationsState := make(map[string]int)
+		totalApplications := 0
+		for _, app := range appList {
+			applicationsState[app.CurrentState()]++
+			totalApplications++
+		}
+		applicationsState["total"] = totalApplications
+		partitionInfo.Applications = applicationsState
+		result = append(result, partitionInfo)
+	}
+
+	return result
+}
+
+func getAppHistoryDao(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
+	result := make([]*dao.ApplicationHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ApplicationHistoryDAOInfo{
+			Timestamp:         record.Timestamp.UnixNano(),
+			TotalApplications: strconv.Itoa(record.TotalApplications),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
+	result := make([]*dao.NodesDAOInfo, 0, len(lists))
+	for _, partition := range lists {
+		ns := partition.GetNodes()
+		nodesDao := make([]*dao.NodeDAOInfo, 0, len(ns))
+		for _, node := range ns {
+			nodeDao := getNodeJSON(node)
+			nodesDao = append(nodesDao, nodeDao)
+		}
+		result = append(result, &dao.NodesDAOInfo{
+			PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
+			Nodes:         nodesDao,
+		})
+	}
+
+	return result
+}
+
+func getContainerHistoryDao(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {
+	result := make([]*dao.ContainerHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ContainerHistoryDAOInfo{
+			Timestamp:       record.Timestamp.UnixNano(),
+			TotalContainers: strconv.Itoa(record.TotalContainers),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesUtilizationDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesUtilDAOInfo {

Review comment:
       PLease reuse `func getNodesUtilization(w http.ResponseWriter, r *http.Request) {`

##########
File path: pkg/webservice/handlers.go
##########
@@ -744,3 +746,140 @@ func getLogLevel(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getPartitionInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
+	result := make([]*dao.PartitionInfo, 0, len(lists))
+
+	for _, partitionContext := range lists {
+		partitionInfo := &dao.PartitionInfo{}
+		partitionInfo.ClusterID = partitionContext.RmID
+		partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
+		partitionInfo.State = partitionContext.GetCurrentState()
+		partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().String()
+
+		capacityInfo := dao.PartitionCapacity{}
+		capacityInfo.Capacity = partitionContext.GetTotalPartitionResource().DAOString()
+		capacityInfo.UsedCapacity = partitionContext.GetAllocatedResource().DAOString()
+		partitionInfo.Capacity = capacityInfo
+		partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
+			Type:            partitionContext.GetNodeSortingPolicyType().String(),
+			ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
+		}
+
+		appList := partitionContext.GetApplications()
+		appList = append(appList, partitionContext.GetCompletedApplications()...)
+		applicationsState := make(map[string]int)
+		totalApplications := 0
+		for _, app := range appList {
+			applicationsState[app.CurrentState()]++
+			totalApplications++
+		}
+		applicationsState["total"] = totalApplications
+		partitionInfo.Applications = applicationsState
+		result = append(result, partitionInfo)
+	}
+
+	return result
+}
+
+func getAppHistoryDao(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
+	result := make([]*dao.ApplicationHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ApplicationHistoryDAOInfo{
+			Timestamp:         record.Timestamp.UnixNano(),
+			TotalApplications: strconv.Itoa(record.TotalApplications),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
+	result := make([]*dao.NodesDAOInfo, 0, len(lists))
+	for _, partition := range lists {
+		ns := partition.GetNodes()
+		nodesDao := make([]*dao.NodeDAOInfo, 0, len(ns))
+		for _, node := range ns {
+			nodeDao := getNodeJSON(node)
+			nodesDao = append(nodesDao, nodeDao)
+		}
+		result = append(result, &dao.NodesDAOInfo{
+			PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
+			Nodes:         nodesDao,
+		})
+	}
+
+	return result
+}
+
+func getContainerHistoryDao(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {
+	result := make([]*dao.ContainerHistoryDAOInfo, 0, len(records))
+	for _, record := range records {
+		if record == nil {
+			continue
+		}
+		element := &dao.ContainerHistoryDAOInfo{
+			Timestamp:       record.Timestamp.UnixNano(),
+			TotalContainers: strconv.Itoa(record.TotalContainers),
+		}
+		result = append(result, element)
+	}
+
+	return result
+}
+
+func getNodesUtilizationDao(lists map[string]*scheduler.PartitionContext) []*dao.NodesUtilDAOInfo {
+	var result []*dao.NodesUtilDAOInfo
+
+	for _, partition := range lists {
+		partitionResource := partition.GetTotalPartitionResource()
+		// partitionResource can be null if the partition has no node
+		if partitionResource != nil {
+			for name := range partitionResource.Resources {
+				result = append(result, getNodesUtilJSON(partition, name))
+			}
+		}
+	}
+
+	return result
+}
+
+func getApplicationsDao(lists map[string]*scheduler.PartitionContext) []*dao.ApplicationDAOInfo {
+	result := make([]*dao.ApplicationDAOInfo, 0, 32)
+
+	for _, partition := range lists {
+		size := partition.GetTotalCompletedApplicationCount() + partition.GetTotalApplicationCount()
+		appList := make([]*objects.Application, size)
+		appList = append(appList, partition.GetApplications()...)
+		appList = append(appList, partition.GetCompletedApplications()...)
+
+		for _, app := range appList {
+			result = append(result, getApplicationJSON(app))
+		}
+	}
+
+	return result
+}
+
+func getPartitionDAOInfo(lists map[string]*scheduler.PartitionContext) []*dao.PartitionDAOInfo {
+	queues := make([]*dao.PartitionDAOInfo, len(lists))
+
+	for _, partition := range lists {
+		queues = append(queues, getPartitionJSON(partition))
+	}
+
+	return queues
+}
+
+func getClusterInfoDao(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo {

Review comment:
       Please reuse `func getClusterInfo(w http.ResponseWriter, r *http.Request) {`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org