You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2021/10/20 00:13:37 UTC

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #313: [YUNIKORN-883] Merge YUNIKORN-872 into master.

wilfred-s commented on a change in pull request #313:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/313#discussion_r732320163



##########
File path: pkg/cache/external/scheduler_cache_test.go
##########
@@ -169,7 +170,7 @@ func TestAddUnassignedPod(t *testing.T) {
 			assert.Check(t, cachedNode.Node() != nil, "host0001 exists in cache but the ref to v1.Node doesn't exist")
 			assert.Equal(t, cachedNode.Node().Name, node.Name)
 			assert.Equal(t, cachedNode.Node().UID, node.UID)
-			assert.Equal(t, len(cachedNode.Pods()), 0)
+			assert.Assert(t, cachedNode != nil && cachedNode.Pods == nil, "pods was not null")

Review comment:
       cachedNode can not be nil as an assert is already run a couple of lines earlier

##########
File path: pkg/cache/context_recovery.go
##########
@@ -87,7 +88,7 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err
 		var podList *corev1.PodList
 		podList, err = ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().
 			CoreV1().Pods("").
-			List(metav1.ListOptions{})
+			List(context.TODO(), metav1.ListOptions{})

Review comment:
       Background context here too, not repeating this anymore. Please check all non test files for `context.TODO()`

##########
File path: pkg/cache/context.go
##########
@@ -686,7 +716,7 @@ func (ctx *Context) updatePodCondition(task *Task, podCondition *v1.PodCondition
 			if podutil.UpdatePodCondition(&task.pod.Status, podCondition) {
 				if !ctx.apiProvider.IsTestingMode() {
 					_, err := ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().CoreV1().
-						Pods(task.pod.Namespace).UpdateStatus(task.pod)
+						Pods(task.pod.Namespace).UpdateStatus(context.TODO(), task.pod, metav1.UpdateOptions{})

Review comment:
       We should not use a `context.TODO()` in production code please use `context.Background()` instead

##########
File path: pkg/plugin/support/framework_handle.go
##########
@@ -0,0 +1,77 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package support
+
+import (
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/events"
+	"k8s.io/kubernetes/pkg/scheduler/framework"
+)
+
+type frameworkHandle struct {
+	sharedLister          framework.SharedLister
+	sharedInformerFactory informers.SharedInformerFactory
+	clientSet             kubernetes.Interface
+}
+
+func (p frameworkHandle) SnapshotSharedLister() framework.SharedLister {
+	return p.sharedLister
+}
+
+func (p frameworkHandle) SharedInformerFactory() informers.SharedInformerFactory {
+	return p.sharedInformerFactory
+}
+
+func (p frameworkHandle) ClientSet() kubernetes.Interface {
+	return p.clientSet
+}
+
+// stubbed out to fulfill framework.Handle contract; these are all currently unused by upstream K8S predicates
+
+func (p frameworkHandle) IterateOverWaitingPods(callback func(framework.WaitingPod)) {
+	panic("BUG: Not used by plugins")

Review comment:
       See earlier comment on panic, use logger Fatal level

##########
File path: pkg/cache/external/scheduler_cache_test.go
##########
@@ -91,7 +91,8 @@ func TestAssignedPod(t *testing.T) {
 			assert.Check(t, cachedNode.Node() != nil, "host0001 exists in cache but the ref to v1.Node doesn't exist")
 			assert.Equal(t, cachedNode.Node().Name, node.Name)
 			assert.Equal(t, cachedNode.Node().UID, node.UID)
-			assert.Equal(t, len(cachedNode.Pods()), 1)
+			assert.Assert(t, cachedNode != nil && cachedNode.Pods != nil, "pods was nil")
+			assert.Equal(t, len(cachedNode.Pods), 1)

Review comment:
       `cachedNode` can not be nil as an assert is already run a couple of lines earlier

##########
File path: pkg/plugin/predicates/predicate_manager.go
##########
@@ -0,0 +1,392 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package predicates
+
+import (
+	"context"
+	"fmt"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/kube-scheduler/config/v1beta1"
+	"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
+	apiConfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
+	"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
+	"k8s.io/kubernetes/pkg/scheduler/framework"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
+	fwruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
+)
+
+type PredicateManager interface {
+	Predicates(pod *v1.Pod, node *framework.NodeInfo, allocate bool) (plugin string, error error)
+}
+
+var _ PredicateManager = &predicateManagerImpl{}
+
+var configDecoder = scheme.Codecs.UniversalDecoder()
+
+type predicateManagerImpl struct {
+	reservationPreFilters *[]framework.PreFilterPlugin
+	allocationPreFilters  *[]framework.PreFilterPlugin
+	reservationFilters    *[]framework.FilterPlugin
+	allocationFilters     *[]framework.FilterPlugin
+}
+
+func (p *predicateManagerImpl) Predicates(pod *v1.Pod, node *framework.NodeInfo, allocate bool) (plugin string, error error) {
+	if allocate {
+		return p.predicatesAllocate(pod, node)
+	}
+	return p.predicatesReserve(pod, node)
+}
+
+func (p *predicateManagerImpl) predicatesReserve(pod *v1.Pod, node *framework.NodeInfo) (plugin string, error error) {
+	ctx := context.TODO()
+	state := framework.NewCycleState()
+	return p.podFitsNode(ctx, state, *p.reservationPreFilters, *p.reservationFilters, pod, node)
+}
+
+func (p *predicateManagerImpl) predicatesAllocate(pod *v1.Pod, node *framework.NodeInfo) (plugin string, error error) {
+	ctx := context.TODO()
+	state := framework.NewCycleState()
+	plugin, err := p.podFitsNode(ctx, state, *p.allocationPreFilters, *p.allocationFilters, pod, node)
+	if err != nil {
+		events.GetRecorder().Eventf(pod, v1.EventTypeWarning,
+			"FailedScheduling", "predicate is not satisfied, error: %s", err.Error())
+	}
+	return plugin, err
+}
+
+func (p *predicateManagerImpl) podFitsNode(ctx context.Context, state *framework.CycleState, preFilters []framework.PreFilterPlugin, filters []framework.FilterPlugin, pod *v1.Pod, node *framework.NodeInfo) (plugin string, error error) {
+	// Run "prefilter" plugins.
+	s, plugin := p.runPreFilterPlugins(ctx, state, preFilters, pod)
+	if !s.IsSuccess() {
+		return plugin, s.AsError()
+	}
+
+	// Run "filter" plugins on node
+	statuses, plugin := p.runFilterPlugins(ctx, filters, state, pod, node)
+	s = statuses.Merge()
+	if !s.IsSuccess() {
+		return plugin, s.AsError()
+	}
+	return "", nil
+}
+
+func (p *predicateManagerImpl) runPreFilterPlugins(ctx context.Context, state *framework.CycleState, plugins []framework.PreFilterPlugin, pod *v1.Pod) (status *framework.Status, plugin string) {
+	for _, pl := range plugins {
+		status = p.runPreFilterPlugin(ctx, pl, state, pod)
+		if !status.IsSuccess() {
+			if status.IsUnschedulable() {
+				return status, plugin
+			}
+			err := status.AsError()
+			log.Logger().Error("failed running PreFilter plugin",
+				zap.String("pluginName", pl.Name()),
+				zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
+				zap.Error(err))
+			return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), err)), plugin
+		}
+	}
+
+	return nil, ""
+}
+
+func (p *predicateManagerImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) *framework.Status {
+	return pl.PreFilter(ctx, state, pod)
+}
+
+func (p *predicateManagerImpl) runFilterPlugins(ctx context.Context, plugins []framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) (status framework.PluginToStatus, plugin string) {
+	statuses := make(framework.PluginToStatus)
+	plugin = ""
+	for _, pl := range plugins {
+		pluginStatus := p.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
+		if !pluginStatus.IsSuccess() {
+			if plugin == "" {
+				plugin = pl.Name()
+			}
+			if !pluginStatus.IsUnschedulable() {
+				// Filter plugins are not supposed to return any status other than
+				// Success or Unschedulable.
+				errStatus := framework.NewStatus(framework.Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
+				log.Logger().Error("failed running Filter plugin",
+					zap.String("pluginName", pl.Name()),
+					zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
+					zap.String("message", pluginStatus.Message()))
+				return map[string]*framework.Status{pl.Name(): errStatus}, pl.Name()
+			}
+			statuses[pl.Name()] = pluginStatus
+		}
+	}
+	return statuses, plugin
+}
+
+func (p *predicateManagerImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
+	return pl.Filter(ctx, state, pod, nodeInfo)
+}
+
+func NewPredicateManager(handle framework.Handle) PredicateManager {
+	/*
+		Default K8S plugins as of 1.20 that implement PreFilter:
+		   NodeResourcesFit
+		   NodePorts
+		   PodTopologySpread
+		   InterPodAffinity
+		   VolumeBinding
+	*/
+
+	// run only the simpler PreFilter plugins during reservation phase
+	reservationPreFilters := map[string]bool{
+		// NodeResourcesFit : skip because during reservation, node resources are not enough
+		nodeports.Name:         true,
+		podtopologyspread.Name: true,
+		interpodaffinity.Name:  true,
+		// VolumeBinding
+	}
+
+	// run all PreFilter plugins during allocation phase
+	allocationPreFilters := map[string]bool{
+		"*": true,
+	}
+
+	/*
+		Default K8S plugins as of 1.20 that implement Filter:
+		    NodeUnschedulable
+			NodeName
+			TaintToleration
+			NodeAffinity
+			NodePorts
+			NodeResourcesFit
+			VolumeRestrictions
+			EBSLimits
+			GCEPDLimits
+			NodeVolumeLimits
+			AzureDiskLimits
+			VolumeBinding
+			VolumeZone
+			PodTopologySpread
+			InterPodAffinity
+	*/
+
+	// run only the simpler Filter plugins during reservation phase
+	reservationFilters := map[string]bool{
+		nodeunschedulable.Name: true,
+		nodename.Name:          true,
+		tainttoleration.Name:   true,
+		nodeaffinity.Name:      true,
+		nodeports.Name:         true,
+		// NodeResourcesFit : skip because during reservation, node resources are not enough
+		// VolumeRestrictions
+		// EBSLimits
+		// GCEPDLimits
+		// NodeVolumeLimits
+		// AzureDiskLimits
+		// VolumeBinding
+		// VolumeZone
+		podtopologyspread.Name: true,
+		interpodaffinity.Name:  true,
+	}
+
+	// run all Filter plugins during allocation phase
+	allocationFilters := map[string]bool{
+		"*": true,
+	}
+
+	return newPredicateManagerInternal(handle, reservationPreFilters, allocationPreFilters, reservationFilters, allocationFilters)
+}
+
+func newPredicateManagerInternal(
+	handle framework.Handle,
+	reservationPreFilters map[string]bool,
+	allocationPreFilters map[string]bool,
+	reservationFilters map[string]bool,
+	allocationFilters map[string]bool) *predicateManagerImpl {
+	pluginRegistry := plugins.NewInTreeRegistry()
+	algRegistry := algorithmprovider.NewRegistry()
+	registeredPlugins, exist := algRegistry[apiConfig.SchedulerDefaultProviderName]
+
+	if !exist {
+		// this is unrecoverable
+		panic(fmt.Sprintf("BUG: Can't get default scheduler provider: %s", apiConfig.SchedulerDefaultProviderName))

Review comment:
       Instead of calling a panic directly use `log.Logger().Fatal()` it will panic also but we get a proper entry in the log which might not be the case if we use `fmt.Sprintf()`

##########
File path: pkg/cache/context.go
##########
@@ -839,7 +869,7 @@ func (ctx *Context) SaveConfigmap(request *si.UpdateConfigurationRequest) *si.Up
 	newConf := ykconf.DeepCopy()
 	oldConfData := ykconf.Data["queues.yaml"]
 	newConf.Data = newConfData
-	_, err = ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().CoreV1().ConfigMaps(ykconf.Namespace).Update(newConf)
+	_, err = ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().CoreV1().ConfigMaps(ykconf.Namespace).Update(context.TODO(), newConf, metav1.UpdateOptions{})

Review comment:
       Background context here too

##########
File path: pkg/plugin/predicates/predicate_manager.go
##########
@@ -0,0 +1,392 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package predicates
+
+import (
+	"context"
+	"fmt"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/kube-scheduler/config/v1beta1"
+	"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
+	apiConfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
+	"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
+	"k8s.io/kubernetes/pkg/scheduler/framework"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
+	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
+	fwruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
+)
+
+type PredicateManager interface {
+	Predicates(pod *v1.Pod, node *framework.NodeInfo, allocate bool) (plugin string, error error)
+}
+
+var _ PredicateManager = &predicateManagerImpl{}
+
+var configDecoder = scheme.Codecs.UniversalDecoder()
+
+type predicateManagerImpl struct {
+	reservationPreFilters *[]framework.PreFilterPlugin
+	allocationPreFilters  *[]framework.PreFilterPlugin
+	reservationFilters    *[]framework.FilterPlugin
+	allocationFilters     *[]framework.FilterPlugin
+}
+
+func (p *predicateManagerImpl) Predicates(pod *v1.Pod, node *framework.NodeInfo, allocate bool) (plugin string, error error) {
+	if allocate {
+		return p.predicatesAllocate(pod, node)
+	}
+	return p.predicatesReserve(pod, node)
+}
+
+func (p *predicateManagerImpl) predicatesReserve(pod *v1.Pod, node *framework.NodeInfo) (plugin string, error error) {
+	ctx := context.TODO()

Review comment:
       Background context throughout




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org