You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@yunikorn.apache.org by GitBox <gi...@apache.org> on 2020/03/24 17:35:40 UTC

[GitHub] [incubator-yunikorn-k8shim] yangwwei opened a new pull request #86: [YUNIKORN-16] co-exist with default scheduler

yangwwei opened a new pull request #86: [YUNIKORN-16] co-exist with default scheduler
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: dev-help@yunikorn.apache.org


[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r398997167
 
 

 ##########
 File path: pkg/cache/node_coordinator.go
 ##########
 @@ -0,0 +1,136 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	k8sCache "k8s.io/client-go/tools/cache"
+)
+
+// nodeResourceCoordinator looks at the resources that not allocated by yunikorn,
+// and refresh scheduler cache to keep nodes' capacity in-sync.
+// this coordinator only looks after the pods that not scheduled by yunikorn,
+// and it registers update/delete handler to the pod informer. It ensures that
+// following operations are done
+//  1) when a pod is becoming Running, add occupied node resource
+//  2) when a pod is terminated, sub the occupied node resource
+//  3) when a pod is deleted, sub the occupied node resource
+// each of these updates will trigger a node UPDATE action to update the occupied
+// resource in the scheduler-core.
+type nodeResourceCoordinator struct {
+	nodes *schedulerNodes
+}
+
+func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator {
+	return &nodeResourceCoordinator{nodes}
+}
+
+// filter pods that not scheduled by us
+func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
+	switch obj.(type) {
+	case *v1.Pod:
+		pod := obj.(*v1.Pod)
+		return !utils.GeneralPodFilter(pod)
+	default:
+		return false
+	}
+}
+
+func (c *nodeResourceCoordinator) updatePod(old, new interface{}) {
+	oldPod, err := utils.Convert2Pod(old)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	newPod, err := utils.Convert2Pod(new)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	// triggered when pod status phase changes
+	if oldPod.Status.Phase != newPod.Status.Phase {
+		if utils.IsAssignedPod(newPod) {
+			log.Logger.Debug("pod phase changes",
+				zap.String("namespace", newPod.Namespace),
+				zap.String("podName", newPod.Name),
+				zap.String("podStatusBefore", string(oldPod.Status.Phase)),
+				zap.String("podStatusCurrent", string(newPod.Status.Phase)))
+			if utils.IsPodRunning(newPod) {
+				// if pod is running but not scheduled by us,
+				// we need to notify scheduler-core to re-sync the node resource
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, AddOccupiedResource)
+				if err := c.nodes.cache.AddPod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			} else if utils.IsPodTerminated(newPod) {
+				// this means pod is terminated
+				// we need sub the occupied resource and re-sync with the scheduler-core
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, SubOccupiedResource)
+				if err := c.nodes.cache.RemovePod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			}
+		}
+	}
+}
+
+func (c *nodeResourceCoordinator) deletePod(obj interface{}) {
+	var pod *v1.Pod
+	switch t := obj.(type) {
+	case *v1.Pod:
+		pod = t
+	case k8sCache.DeletedFinalStateUnknown:
+		var err error
+		pod, err = utils.Convert2Pod(t.Obj)
+		if err != nil {
+			log.Logger.Error(err.Error())
+			return
+		}
+	default:
+		log.Logger.Error("cannot convert to pod")
+		return
+	}
+
+	// if pod is already terminated, that means the updates have already done
+	if utils.IsPodTerminated(pod) {
+		log.Logger.Debug("pod is already terminated, occupied resource updated should have already done")
+		return
+	}
+
+	log.Logger.Info("delete pod that scheduled by other schedulers",
 
 Review comment:
   nit:  delete -> deleting

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on issue #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on issue #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#issuecomment-604948470
 
 
   thank you, changes committed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r398996429
 
 

 ##########
 File path: pkg/cache/node_coordinator.go
 ##########
 @@ -0,0 +1,136 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	k8sCache "k8s.io/client-go/tools/cache"
+)
+
+// nodeResourceCoordinator looks at the resources that not allocated by yunikorn,
+// and refresh scheduler cache to keep nodes' capacity in-sync.
+// this coordinator only looks after the pods that not scheduled by yunikorn,
 
 Review comment:
   nit: the word *are* is missing before _not_ (twice)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on issue #86: [YUNIKORN-16] co-exist with default scheduler

Posted by GitBox <gi...@apache.org>.
yangwwei commented on issue #86: [YUNIKORN-16] co-exist with default scheduler
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#issuecomment-603662646
 
 
   This one is not ready for review yet..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: dev-help@yunikorn.apache.org


[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399006562
 
 

 ##########
 File path: pkg/common/resource.go
 ##########
 @@ -244,3 +268,35 @@ func Add(left *si.Resource, right *si.Resource) *si.Resource {
 	}
 	return result
 }
+
+func Sub(left *si.Resource, right *si.Resource) *si.Resource {
+	if left == nil {
+		left = &si.Resource{}
+	}
+	if right == nil {
+		return &si.Resource{}
 
 Review comment:
   this is incorrect: if right is nil we need to return left.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399034298
 
 

 ##########
 File path: pkg/cache/node_coordinator.go
 ##########
 @@ -0,0 +1,136 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	k8sCache "k8s.io/client-go/tools/cache"
+)
+
+// nodeResourceCoordinator looks at the resources that not allocated by yunikorn,
+// and refresh scheduler cache to keep nodes' capacity in-sync.
+// this coordinator only looks after the pods that not scheduled by yunikorn,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399034441
 
 

 ##########
 File path: pkg/cache/node_coordinator.go
 ##########
 @@ -0,0 +1,136 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	k8sCache "k8s.io/client-go/tools/cache"
+)
+
+// nodeResourceCoordinator looks at the resources that not allocated by yunikorn,
+// and refresh scheduler cache to keep nodes' capacity in-sync.
+// this coordinator only looks after the pods that not scheduled by yunikorn,
+// and it registers update/delete handler to the pod informer. It ensures that
+// following operations are done
+//  1) when a pod is becoming Running, add occupied node resource
+//  2) when a pod is terminated, sub the occupied node resource
+//  3) when a pod is deleted, sub the occupied node resource
+// each of these updates will trigger a node UPDATE action to update the occupied
+// resource in the scheduler-core.
+type nodeResourceCoordinator struct {
+	nodes *schedulerNodes
+}
+
+func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator {
+	return &nodeResourceCoordinator{nodes}
+}
+
+// filter pods that not scheduled by us
+func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
+	switch obj.(type) {
+	case *v1.Pod:
+		pod := obj.(*v1.Pod)
+		return !utils.GeneralPodFilter(pod)
+	default:
+		return false
+	}
+}
+
+func (c *nodeResourceCoordinator) updatePod(old, new interface{}) {
+	oldPod, err := utils.Convert2Pod(old)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	newPod, err := utils.Convert2Pod(new)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	// triggered when pod status phase changes
+	if oldPod.Status.Phase != newPod.Status.Phase {
+		if utils.IsAssignedPod(newPod) {
+			log.Logger.Debug("pod phase changes",
+				zap.String("namespace", newPod.Namespace),
+				zap.String("podName", newPod.Name),
+				zap.String("podStatusBefore", string(oldPod.Status.Phase)),
+				zap.String("podStatusCurrent", string(newPod.Status.Phase)))
+			if utils.IsPodRunning(newPod) {
+				// if pod is running but not scheduled by us,
+				// we need to notify scheduler-core to re-sync the node resource
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, AddOccupiedResource)
+				if err := c.nodes.cache.AddPod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			} else if utils.IsPodTerminated(newPod) {
+				// this means pod is terminated
+				// we need sub the occupied resource and re-sync with the scheduler-core
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, SubOccupiedResource)
+				if err := c.nodes.cache.RemovePod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			}
+		}
+	}
+}
+
+func (c *nodeResourceCoordinator) deletePod(obj interface{}) {
+	var pod *v1.Pod
+	switch t := obj.(type) {
+	case *v1.Pod:
+		pod = t
+	case k8sCache.DeletedFinalStateUnknown:
+		var err error
+		pod, err = utils.Convert2Pod(t.Obj)
+		if err != nil {
+			log.Logger.Error(err.Error())
+			return
+		}
+	default:
+		log.Logger.Error("cannot convert to pod")
+		return
+	}
+
+	// if pod is already terminated, that means the updates have already done
+	if utils.IsPodTerminated(pod) {
+		log.Logger.Debug("pod is already terminated, occupied resource updated should have already done")
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399001317
 
 

 ##########
 File path: pkg/cache/task.go
 ##########
 @@ -342,19 +345,36 @@ func (task *Task) postTaskCompleted(event *fsm.Event) {
 
 func (task *Task) releaseAllocation() {
 	// when task is completed, we notify the scheduler to release allocations
-	go func() {
-		// scheduler api might be nil in some tests
-		if task.context.apiProvider.GetAPIs().SchedulerAPI != nil {
-			releaseRequest := common.CreateReleaseAllocationRequestForTask(
+	// scheduler api might be nil in some tests
+	if task.context.apiProvider.GetAPIs().SchedulerAPI != nil {
+		// if allocated, sending release
+
 
 Review comment:
   not quite clear on this comment, the switch has a comment that explains the flow this one seems to be disconnected from everything

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399010380
 
 

 ##########
 File path: pkg/common/resource_test.go
 ##########
 @@ -243,3 +244,76 @@ func TestNodeResource(t *testing.T) {
 
 	assert.Equal(t, result.Resources[CPU].GetValue(), int64(14500))
 }
+
+func TestIsZero(t *testing.T) {
+	r := NewResourceBuilder().
+		AddResource(Memory, 1).
+		AddResource(CPU, 1).
+		Build()
+	assert.Equal(t, IsZero(r), false)
+
+	r = NewResourceBuilder().
+		AddResource(CPU, 0).
+		Build()
+	assert.Equal(t, IsZero(r), true)
+
+	r = NewResourceBuilder().
+		AddResource(Memory, 0).
+		AddResource(CPU, 0).
+		Build()
+	assert.Equal(t, IsZero(r), true)
+
+	r = NewResourceBuilder().
+		AddResource(Memory, 0).
+		AddResource(CPU, 1).
+		Build()
+	assert.Equal(t, IsZero(r), false)
+
+	assert.Equal(t, IsZero(nil), true)
+
+	r = &si.Resource{}
+	assert.Equal(t, IsZero(r), true)
+}
+
+func TestSub(t *testing.T) {
+	// simple case (nil checks)
+	result := Sub(nil, nil)
+	if result == nil || len(result.Resources) != 0 {
+		t.Errorf("sub nil resources did not return zero resource: %v", result)
+	}
+
+	// empty resources
+	left := NewResourceBuilder().Build()
+	result = Sub(left, nil)
+	if result == nil || len(result.Resources) != 0 || result == left {
+		t.Errorf("sub Zero resource (right) did not return cloned resource: %v", result)
+	}
+
+	// simple empty resources
+	res1 := NewResourceBuilder().
+		AddResource("a", 5).
+		Build()
+	result = Sub(left, res1)
+	assert.Equal(t, IsZero(result), true)
+
+	// complex case: just checking the resource merge, values check is secondary
+	res1 = NewResourceBuilder().
+		AddResource("a", 0).
+		AddResource("b", 1).
+		Build()
+	res2 := NewResourceBuilder().
+		AddResource("a", 1).
+		AddResource("c", 0).
+		AddResource("d", -1).
+		Build()
+	res3 := Sub(res1, res2)
+
+	expected := NewResourceBuilder().
+		AddResource("a", -1).
+		AddResource("b", 1).
+		Build()
 
 Review comment:
   That is not what I would expect based on the way we do *add*, see comment in implementation code

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399010066
 
 

 ##########
 File path: pkg/common/resource.go
 ##########
 @@ -244,3 +268,35 @@ func Add(left *si.Resource, right *si.Resource) *si.Resource {
 	}
 	return result
 }
+
+func Sub(left *si.Resource, right *si.Resource) *si.Resource {
+	if left == nil {
+		left = &si.Resource{}
+	}
+	if right == nil {
+		return &si.Resource{}
+	}
+
+	// neither are nil, clone one and sub the other
+	rb := NewResourceBuilder()
+	for k, v := range left.Resources {
+		if rightValue, ok := right.Resources[k]; ok {
+			rb.AddResource(k, v.Value - rightValue.Value)
+		} else {
+			rb.AddResource(k, v.Value)
+		}
+	}
+	return rb.Build()
 
 Review comment:
   if *right* contains resources *left* does not have we do not subtract them.
   This is contrary to what we do in add where we properly merge the two.
   
   We should behave the same way in add and sub

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399034496
 
 

 ##########
 File path: pkg/cache/node_coordinator.go
 ##########
 @@ -0,0 +1,136 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	k8sCache "k8s.io/client-go/tools/cache"
+)
+
+// nodeResourceCoordinator looks at the resources that not allocated by yunikorn,
+// and refresh scheduler cache to keep nodes' capacity in-sync.
+// this coordinator only looks after the pods that not scheduled by yunikorn,
+// and it registers update/delete handler to the pod informer. It ensures that
+// following operations are done
+//  1) when a pod is becoming Running, add occupied node resource
+//  2) when a pod is terminated, sub the occupied node resource
+//  3) when a pod is deleted, sub the occupied node resource
+// each of these updates will trigger a node UPDATE action to update the occupied
+// resource in the scheduler-core.
+type nodeResourceCoordinator struct {
+	nodes *schedulerNodes
+}
+
+func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator {
+	return &nodeResourceCoordinator{nodes}
+}
+
+// filter pods that not scheduled by us
+func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
+	switch obj.(type) {
+	case *v1.Pod:
+		pod := obj.(*v1.Pod)
+		return !utils.GeneralPodFilter(pod)
+	default:
+		return false
+	}
+}
+
+func (c *nodeResourceCoordinator) updatePod(old, new interface{}) {
+	oldPod, err := utils.Convert2Pod(old)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	newPod, err := utils.Convert2Pod(new)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	// triggered when pod status phase changes
+	if oldPod.Status.Phase != newPod.Status.Phase {
+		if utils.IsAssignedPod(newPod) {
+			log.Logger.Debug("pod phase changes",
+				zap.String("namespace", newPod.Namespace),
+				zap.String("podName", newPod.Name),
+				zap.String("podStatusBefore", string(oldPod.Status.Phase)),
+				zap.String("podStatusCurrent", string(newPod.Status.Phase)))
+			if utils.IsPodRunning(newPod) {
+				// if pod is running but not scheduled by us,
+				// we need to notify scheduler-core to re-sync the node resource
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, AddOccupiedResource)
+				if err := c.nodes.cache.AddPod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			} else if utils.IsPodTerminated(newPod) {
+				// this means pod is terminated
+				// we need sub the occupied resource and re-sync with the scheduler-core
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, SubOccupiedResource)
+				if err := c.nodes.cache.RemovePod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			}
+		}
+	}
+}
+
+func (c *nodeResourceCoordinator) deletePod(obj interface{}) {
+	var pod *v1.Pod
+	switch t := obj.(type) {
+	case *v1.Pod:
+		pod = t
+	case k8sCache.DeletedFinalStateUnknown:
+		var err error
+		pod, err = utils.Convert2Pod(t.Obj)
+		if err != nil {
+			log.Logger.Error(err.Error())
+			return
+		}
+	default:
+		log.Logger.Error("cannot convert to pod")
+		return
+	}
+
+	// if pod is already terminated, that means the updates have already done
+	if utils.IsPodTerminated(pod) {
+		log.Logger.Debug("pod is already terminated, occupied resource updated should have already done")
+		return
+	}
+
+	log.Logger.Info("delete pod that scheduled by other schedulers",
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399035865
 
 

 ##########
 File path: pkg/common/resource.go
 ##########
 @@ -244,3 +268,35 @@ func Add(left *si.Resource, right *si.Resource) *si.Resource {
 	}
 	return result
 }
+
+func Sub(left *si.Resource, right *si.Resource) *si.Resource {
+	if left == nil {
+		left = &si.Resource{}
+	}
+	if right == nil {
+		return &si.Resource{}
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei removed a comment on issue #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
yangwwei removed a comment on issue #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#issuecomment-603662646
 
 
   This one is not ready for review yet..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r399034699
 
 

 ##########
 File path: pkg/cache/task.go
 ##########
 @@ -342,19 +345,36 @@ func (task *Task) postTaskCompleted(event *fsm.Event) {
 
 func (task *Task) releaseAllocation() {
 	// when task is completed, we notify the scheduler to release allocations
-	go func() {
-		// scheduler api might be nil in some tests
-		if task.context.apiProvider.GetAPIs().SchedulerAPI != nil {
-			releaseRequest := common.CreateReleaseAllocationRequestForTask(
+	// scheduler api might be nil in some tests
+	if task.context.apiProvider.GetAPIs().SchedulerAPI != nil {
+		// if allocated, sending release
+
 
 Review comment:
   yeah, removed it. it was the old comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s closed pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s closed pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core

Posted by GitBox <gi...@apache.org>.
wilfred-s commented on a change in pull request #86: [YUNIKORN-49] K8shim should report occupied resources to scheduler core
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/86#discussion_r398996826
 
 

 ##########
 File path: pkg/cache/node_coordinator.go
 ##########
 @@ -0,0 +1,136 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+	k8sCache "k8s.io/client-go/tools/cache"
+)
+
+// nodeResourceCoordinator looks at the resources that not allocated by yunikorn,
+// and refresh scheduler cache to keep nodes' capacity in-sync.
+// this coordinator only looks after the pods that not scheduled by yunikorn,
+// and it registers update/delete handler to the pod informer. It ensures that
+// following operations are done
+//  1) when a pod is becoming Running, add occupied node resource
+//  2) when a pod is terminated, sub the occupied node resource
+//  3) when a pod is deleted, sub the occupied node resource
+// each of these updates will trigger a node UPDATE action to update the occupied
+// resource in the scheduler-core.
+type nodeResourceCoordinator struct {
+	nodes *schedulerNodes
+}
+
+func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator {
+	return &nodeResourceCoordinator{nodes}
+}
+
+// filter pods that not scheduled by us
+func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
+	switch obj.(type) {
+	case *v1.Pod:
+		pod := obj.(*v1.Pod)
+		return !utils.GeneralPodFilter(pod)
+	default:
+		return false
+	}
+}
+
+func (c *nodeResourceCoordinator) updatePod(old, new interface{}) {
+	oldPod, err := utils.Convert2Pod(old)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	newPod, err := utils.Convert2Pod(new)
+	if err != nil {
+		log.Logger.Error("expecting a pod object", zap.Error(err))
+		return
+	}
+
+	// triggered when pod status phase changes
+	if oldPod.Status.Phase != newPod.Status.Phase {
+		if utils.IsAssignedPod(newPod) {
+			log.Logger.Debug("pod phase changes",
+				zap.String("namespace", newPod.Namespace),
+				zap.String("podName", newPod.Name),
+				zap.String("podStatusBefore", string(oldPod.Status.Phase)),
+				zap.String("podStatusCurrent", string(newPod.Status.Phase)))
+			if utils.IsPodRunning(newPod) {
+				// if pod is running but not scheduled by us,
+				// we need to notify scheduler-core to re-sync the node resource
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, AddOccupiedResource)
+				if err := c.nodes.cache.AddPod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			} else if utils.IsPodTerminated(newPod) {
+				// this means pod is terminated
+				// we need sub the occupied resource and re-sync with the scheduler-core
+				podResource := common.GetPodResource(newPod)
+				c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, SubOccupiedResource)
+				if err := c.nodes.cache.RemovePod(newPod); err != nil {
+					log.Logger.Warn("failed to update scheduler-cache",
+						zap.Error(err))
+				}
+			}
+		}
+	}
+}
+
+func (c *nodeResourceCoordinator) deletePod(obj interface{}) {
+	var pod *v1.Pod
+	switch t := obj.(type) {
+	case *v1.Pod:
+		pod = t
+	case k8sCache.DeletedFinalStateUnknown:
+		var err error
+		pod, err = utils.Convert2Pod(t.Obj)
+		if err != nil {
+			log.Logger.Error(err.Error())
+			return
+		}
+	default:
+		log.Logger.Error("cannot convert to pod")
+		return
+	}
+
+	// if pod is already terminated, that means the updates have already done
+	if utils.IsPodTerminated(pod) {
+		log.Logger.Debug("pod is already terminated, occupied resource updated should have already done")
 
 Review comment:
   > should have already done
   
   should have already *been* done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services