You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2020/11/17 23:59:19 UTC

[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #221: [YUNIKORN-317] Cache removal and refactoring

wilfred-s commented on a change in pull request #221:
URL: https://github.com/apache/incubator-yunikorn-core/pull/221#discussion_r525604808



##########
File path: pkg/scheduler/context.go
##########
@@ -0,0 +1,761 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package scheduler
+
+import (
+	"fmt"
+	"math"
+	"sync"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/metrics"
+	"github.com/apache/incubator-yunikorn-core/pkg/plugins"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/objects"
+	siCommon "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/common"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+const disableReservation = "DISABLE_RESERVATION"
+
+type ClusterContext struct {
+	partitions     map[string]*PartitionContext
+	policyGroup    string
+	rmEventHandler handler.EventHandler
+
+	// config values that change scheduling behaviour
+	needPreemption      bool
+	reservationDisabled bool
+
+	sync.RWMutex
+}
+
+// Create a new cluster context to be used outside of the event system.
+// test only
+func NewClusterContext(rmID, policyGroup string) (*ClusterContext, error) {
+	// load the config this returns a validated configuration
+	conf, err := configs.SchedulerConfigLoader(policyGroup)
+	if err != nil {
+		return nil, err
+	}
+	// create the context and set the policyGroup
+	cc := &ClusterContext{
+		partitions:          make(map[string]*PartitionContext),
+		policyGroup:         policyGroup,
+		reservationDisabled: common.GetBoolEnvVar(disableReservation, false),
+	}
+	// If reservation is turned off set the reservation delay to the maximum duration defined.
+	// The time package does not export maxDuration so use the equivalent from the math package.
+	if cc.reservationDisabled {
+		objects.SetReservationDelay(math.MaxInt64)
+	}
+	err = cc.updateSchedulerConfig(conf, rmID)
+	if err != nil {
+		return nil, err
+	}
+	// update the global config
+	configs.ConfigContext.Set(policyGroup, conf)
+	return cc, nil
+}
+
+func newClusterContext() *ClusterContext {
+	cc := &ClusterContext{
+		partitions:          make(map[string]*PartitionContext),
+		reservationDisabled: common.GetBoolEnvVar(disableReservation, false),
+	}
+	// If reservation is turned off set the reservation delay to the maximum duration defined.
+	// The time package does not export maxDuration so use the equivalent from the math package.
+	if cc.reservationDisabled {
+		objects.SetReservationDelay(math.MaxInt64)
+	}
+	return cc
+}
+
+func (cc *ClusterContext) setEventHandler(rmHandler handler.EventHandler) {
+	cc.rmEventHandler = rmHandler
+}
+
+// The main scheduling routine.
+// Process each partition in the scheduler, walk over each queue and app to check if anything can be scheduled.
+// This can be forked into a go routine per partition if needed to increase parallel allocations
+func (cc *ClusterContext) schedule() {
+	// schedule each partition defined in the cluster
+	for _, psc := range cc.GetPartitionMapClone() {
+		// if there are no resources in the partition just skip
+		if psc.root.GetMaxResource() == nil {
+			continue
+		}
+		// try reservations first
+		alloc := psc.tryReservedAllocate()
+		// nothing reserved that can be allocated try normal allocate
+		if alloc == nil {
+			alloc = psc.tryAllocate()
+		}
+		if alloc != nil {
+			// communicate the allocation to the RM
+			cc.rmEventHandler.HandleEvent(&rmevent.RMNewAllocationsEvent{
+				Allocations: []*si.Allocation{alloc.NewSIFromAllocation()},
+				RmID:        psc.RmID,
+			})
+			// TODO: The alloc is just passed to the RM why do we need a callback?
+			// TODO: The comments are from before the cache and scheduler merge
+			// if reconcile plugin is enabled, re-sync the cache now.
+			// before deciding on an allocation, call the reconcile plugin to sync scheduler cache
+			// between core and shim if necessary. This is useful when running multiple allocations

Review comment:
       Not in the current code but the whole scheduling cycle is written to allow us to schedule in parallel. We could for instance decide to run separate go routines for part of the queue structure if we wanted to. The scheduler does support that from a locking and updating perspective.
   BTW: the comment has always been there this is not a new comment. The TODO line is the only line added to the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org