You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2021/01/14 08:31:53 UTC

[GitHub] [incubator-yunikorn-k8shim] yangwwei opened a new pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

yangwwei opened a new pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219


   This is a PR opened for extra review before merging into the master branch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (d0a5e7b) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.94%`.
   > The diff coverage is `76.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.72%   +1.94%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3136     +277     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1181      +43     
   - Partials       69       82      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `81.53% <81.53%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...d0a5e7b](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559631824



##########
File path: pkg/shim/scheduler.go
##########
@@ -267,6 +267,10 @@ func (ss *KubernetesShim) run() {
 	}
 
 	ss.apiFactory.Start()
+
+	// run the placeholder manager
+	placeholderMgr := cache.NewPlaceholderManager(ss.apiFactory.GetAPIs())
+	placeholderMgr.Start()

Review comment:
       Since the placeholderManager is never nil, this can be simplified into a one line call:  `cache.NewPlaceholderManager(ss.apiFactory.GetAPIs()).Start()`

##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,183 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {

Review comment:
       What is the index here? It may worth a comment

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients *client.Clients
+	// when the placeholder manager is unable to delete a pod,
+	// this pod becomes to be an "orphan" pod. We add them to a map
+	// and keep retrying deleting them in order to avoid wasting resources.
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	mgr.Lock()
+	defer mgr.Unlock()
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})

Review comment:
       Shouldn't this line be moved after the check if the placeholder is running? If the placeholder is already is running, here the channel is overwritten. 

##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,183 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars

Review comment:
       the comment is not in sync with the impl. App Id is shorter than 35 or 28?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients *client.Clients
+	// when the placeholder manager is unable to delete a pod,
+	// this pod becomes to be an "orphan" pod. We add them to a map
+	// and keep retrying deleting them in order to avoid wasting resources.
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	mgr.Lock()
+	defer mgr.Unlock()
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {

Review comment:
       Right now the placeholder manager is never stopped. This method is called only from the tests. I think this should be stopped, when the scheduler is stopped. 

##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,183 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars
+	// total length no longer than 20 + 28 + 5 + 10 = 63
+	shortTaskGroupName := fmt.Sprintf("%.20s", taskGroupName)
+	shortAppID := fmt.Sprintf("%.28s", appID)
+	return "tg-" + shortTaskGroupName + "-" + shortAppID + fmt.Sprintf("-%d", index)
+}
+
+func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.ResourceList {
+	resourceReq := v1.ResourceList{}
+	for k, v := range resources {
+		resourceReq[v1.ResourceName(k)] = v
+	}
+	return resourceReq
+}
+
+func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
+	if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok {
+		if v, err := strconv.ParseBool(value); err == nil {
+			return v
+		}
+	}
+	return false
+}
+
+func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
+	if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok {
+		return value
+	}
+	return ""
+}
+
+func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]v1alpha1.TaskGroup, error) {
+	taskGroupInfo, ok := pod.Annotations[constants.AnnotationTaskGroups]
+	if !ok {
+		return nil, nil
+	}
+	taskGroups := []v1alpha1.TaskGroup{}
+	err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups)
+	if err != nil {
+		return nil, err
+	}
+	// json.Unmarchal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format.
+	for _, taskGroup := range taskGroups {
+		if taskGroup.Name == "" {
+			return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+		if taskGroup.MinMember == int32(0) {

Review comment:
       Here we should check for negative values as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r560050256



##########
File path: pkg/cache/application.go
##########
@@ -81,8 +87,14 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched
 			{Name: string(events.AcceptApplication),
 				Src: []string{states.Submitted, states.Recovering},
 				Dst: states.Accepted},
+			{Name: string(events.TryReserve),
+				Src: []string{states.Accepted},
+				Dst: states.Reserving},
+			{Name: string(events.UpdateReservation),
+				Src: []string{states.Reserving},
+				Dst: states.Reserving},

Review comment:
       For me it a bit strange that we have this event here in the state machine, but there is no state transition associated with it. 

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients *client.Clients
+	// when the placeholder manager is unable to delete a pod,
+	// this pod becomes to be an "orphan" pod. We add them to a map
+	// and keep retrying deleting them in order to avoid wasting resources.
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	mgr.Lock()
+	defer mgr.Unlock()
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})

Review comment:
       I cannot see the change here. I was pointing to `mgr.stopChan = make(chan struct{})` part, not the log message .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559605776



##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:
+		app.scheduleTasks(func(t *Task) bool {
+			return t.placeholder
+		})
 	case states.Running:
-		if len(app.GetNewTasks()) > 0 {
-			for _, task := range app.GetNewTasks() {
-				// for each new task, we do a sanity check before moving the state to Pending_Schedule
-				if err := task.sanityCheckBeforeScheduling(); err == nil {
-					// note, if we directly trigger submit task event, it may spawn too many duplicate
-					// events, because a task might be submitted multiple times before its state transits to PENDING.
-					if handleErr := task.handle(
-						NewSimpleTaskEvent(task.applicationID, task.taskID, events.InitTask)); handleErr != nil {
-						// something goes wrong when transit task to PENDING state,
-						// this should not happen because we already checked the state
-						// before calling the transition. Nowhere to go, just log the error.
-						log.Logger().Warn("init task failed", zap.Error(err))
-					}
-				} else {
-					events.GetRecorder().Event(task.GetTaskPod(), v1.EventTypeWarning, "FailedScheduling", err.Error())
-					log.Logger().Debug("task is not ready for scheduling",
-						zap.String("appID", task.applicationID),
-						zap.String("taskID", task.taskID),
-						zap.Error(err))
-				}
-			}
-		}
+		app.scheduleTasks(func(t *Task) bool {
+			return !t.placeholder
+		})
 	default:
 		log.Logger().Debug("skipping scheduling application",
 			zap.String("appID", app.GetApplicationID()),
 			zap.String("appState", app.GetApplicationState()))
 	}
 }
 
+func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
+	for _, task := range app.GetNewTasks() {
+		if taskScheduleCondition(task) {

Review comment:
       For me it seems simpler to read, but this is something I can live with.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558549795



##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:
+        - containerPort: 8863
+EOF)
+# wait for web server to be running
+until grep 'Running' <(kubectl get pod gangweb -o=jsonpath='{.status.phase}'); do
+  sleep 1
+done

Review comment:
       Track via https://issues.apache.org/jira/browse/YUNIKORN-509.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-762289699


   There is a test failure in the Travis, can you please check if it can be something related to the changes, or not?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558553468



##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:
+		app.scheduleTasks(func(t *Task) bool {
+			return t.placeholder
+		})
 	case states.Running:
-		if len(app.GetNewTasks()) > 0 {
-			for _, task := range app.GetNewTasks() {
-				// for each new task, we do a sanity check before moving the state to Pending_Schedule
-				if err := task.sanityCheckBeforeScheduling(); err == nil {
-					// note, if we directly trigger submit task event, it may spawn too many duplicate
-					// events, because a task might be submitted multiple times before its state transits to PENDING.
-					if handleErr := task.handle(
-						NewSimpleTaskEvent(task.applicationID, task.taskID, events.InitTask)); handleErr != nil {
-						// something goes wrong when transit task to PENDING state,
-						// this should not happen because we already checked the state
-						// before calling the transition. Nowhere to go, just log the error.
-						log.Logger().Warn("init task failed", zap.Error(err))
-					}
-				} else {
-					events.GetRecorder().Event(task.GetTaskPod(), v1.EventTypeWarning, "FailedScheduling", err.Error())
-					log.Logger().Debug("task is not ready for scheduling",
-						zap.String("appID", task.applicationID),
-						zap.String("taskID", task.taskID),
-						zap.Error(err))
-				}
-			}
-		}
+		app.scheduleTasks(func(t *Task) bool {
+			return !t.placeholder
+		})
 	default:
 		log.Logger().Debug("skipping scheduling application",
 			zap.String("appID", app.GetApplicationID()),
 			zap.String("appState", app.GetApplicationState()))
 	}
 }
 
+func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
+	for _, task := range app.GetNewTasks() {
+		if taskScheduleCondition(task) {

Review comment:
       I am hesitant to change like that.
   That seems to be simpler, but I do not think it is easier to read.
   I found the current condition function is easier to follow, as the condition explicitly explains what task we can schedule.
   hope that is OK for you. pls let me know, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (12ecf11) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.94%`.
   > The diff coverage is `76.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.72%   +1.94%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3136     +277     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1181      +43     
   - Partials       69       82      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `81.53% <81.53%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...12ecf11](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (ec73314) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.68%`.
   > The diff coverage is `76.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.46%   +1.68%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3133     +274     
   ==========================================
   + Hits         1652     1863     +211     
   - Misses       1138     1188      +50     
   - Partials       69       82      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `70.40% <82.22%> (-1.23%)` | :arrow_down: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.48% <85.48%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [6 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...ec73314](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] commented on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (368300d) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `2.29%`.
   > The diff coverage is `76.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.43%   59.73%   +2.29%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3129     +270     
   ==========================================
   + Hits         1642     1869     +227     
   - Misses       1147     1179      +32     
   - Partials       70       81      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.27% <77.27%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+7.42%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.00% <85.00%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [6 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...368300d](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559813147



##########
File path: pkg/shim/scheduler.go
##########
@@ -267,6 +267,10 @@ func (ss *KubernetesShim) run() {
 	}
 
 	ss.apiFactory.Start()
+
+	// run the placeholder manager
+	placeholderMgr := cache.NewPlaceholderManager(ss.apiFactory.GetAPIs())
+	placeholderMgr.Start()

Review comment:
       Agree, addressed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (d0a5e7b) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.94%`.
   > The diff coverage is `76.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.72%   +1.94%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3136     +277     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1181      +43     
   - Partials       69       82      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `81.53% <81.53%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...12ecf11](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558517589



##########
File path: pkg/cache/placeholder.go
##########
@@ -0,0 +1,79 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"fmt"
+
+	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+)
+
+type Placeholder struct {
+	appID         string
+	taskGroupName string
+	pod           *v1.Pod
+}
+
+func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1.TaskGroup) *Placeholder {
+	placeholderPod := &v1.Pod{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      placeholderName,
+			Namespace: app.tags[constants.AppTagNamespace],
+			Labels: map[string]string{
+				constants.LabelApplicationID: app.GetApplicationID(),
+				constants.LabelQueueName:     app.GetQueue(),
+			},
+			Annotations: map[string]string{
+				constants.AnnotationPlaceholderFlag: "true",
+				constants.AnnotationTaskGroupName:   taskGroup.Name,
+			},

Review comment:
       I do not think the extra labels or annotations will have any impact on the scheduler.
   Per doc: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/. "Labels are intended to be used to specify identifying attributes of objects that are meaningful and relevant to users, but do not directly imply semantics to the core system." Similar to the annotations.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r560380501



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients *client.Clients
+	// when the placeholder manager is unable to delete a pod,
+	// this pod becomes to be an "orphan" pod. We add them to a map
+	// and keep retrying deleting them in order to avoid wasting resources.
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	mgr.Lock()
+	defer mgr.Unlock()
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})

Review comment:
       Sorry, I misunderstood the previous comment. I have just fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559814706



##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,183 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {

Review comment:
       Make sense, added comment in the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558523956



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")

Review comment:
       It is true, but the init function `func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager` requires parameter `client.Clients`. This means we can only init the placeholder manager when we start the scheduler. The `GetPlaceholderManager()` gives easy access to the singleton placeholderMgr object, so we can call the functions without passing the placeholder manager instance to many objects. If the placeholderMgr is not initialized, that means something is wrong in the init, we directly fail with a fatal error. Does this make sense to you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (368300d) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `2.29%`.
   > The diff coverage is `76.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.43%   59.73%   +2.29%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3129     +270     
   ==========================================
   + Hits         1642     1869     +227     
   - Misses       1147     1179      +32     
   - Partials       70       81      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.27% <77.27%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+7.42%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.00% <85.00%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [6 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...368300d](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559816367



##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,183 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars
+	// total length no longer than 20 + 28 + 5 + 10 = 63
+	shortTaskGroupName := fmt.Sprintf("%.20s", taskGroupName)
+	shortAppID := fmt.Sprintf("%.28s", appID)
+	return "tg-" + shortTaskGroupName + "-" + shortAppID + fmt.Sprintf("-%d", index)
+}
+
+func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.ResourceList {
+	resourceReq := v1.ResourceList{}
+	for k, v := range resources {
+		resourceReq[v1.ResourceName(k)] = v
+	}
+	return resourceReq
+}
+
+func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
+	if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok {
+		if v, err := strconv.ParseBool(value); err == nil {
+			return v
+		}
+	}
+	return false
+}
+
+func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
+	if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok {
+		return value
+	}
+	return ""
+}
+
+func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]v1alpha1.TaskGroup, error) {
+	taskGroupInfo, ok := pod.Annotations[constants.AnnotationTaskGroups]
+	if !ok {
+		return nil, nil
+	}
+	taskGroups := []v1alpha1.TaskGroup{}
+	err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups)
+	if err != nil {
+		return nil, err
+	}
+	// json.Unmarchal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format.
+	for _, taskGroup := range taskGroups {
+		if taskGroup.Name == "" {
+			return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+		if taskGroup.MinMember == int32(0) {

Review comment:
       Make sense, added the check as well as the UT.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (d0a5e7b) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.94%`.
   > The diff coverage is `76.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.72%   +1.94%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3136     +277     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1181      +43     
   - Partials       69       82      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `81.53% <81.53%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...d0a5e7b](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558495606



##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent

Review comment:
       this is a callback on the state change:
   
   ```
   fsm.Callbacks{
     ...
     events.States().Application.Accepted:  app.postAppAccepted,
     ...
   },
   ```
   
   according to the FSM document: "1. <NEW_STATE> - called after entering <NEW_STATE>". This callback will only get called when it is entering the Accepted state. Is this enough or do you think we need to add some more checks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558514725



##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)
+			dispatcher.Dispatch(ev)
+		}
+	}()
+}
+
+func (app *Application) onReservationStateChange(event *fsm.Event) {
+	// this event is called when there is a add or release of placeholders
+	desireCounts := utils.NewTaskGroupInstanceCountMap()
+	for _, tg := range app.taskGroups {
+		desireCounts.Add(tg.Name, tg.MinMember)
+	}
+
+	actualCounts := utils.NewTaskGroupInstanceCountMap()
+	for _, t := range app.getTasks(events.States().Task.Bound) {
+		if t.placeholder {
+			actualCounts.AddOne(t.taskGroupName)
+		}
+	}
+
+	// min member all satisfied
+	if desireCounts.Equals(actualCounts) {
+		ev := NewRunApplicationEvent(app.applicationID)
+		dispatcher.Dispatch(ev)
+	}

Review comment:
       Yes, I think we should wait for all the taskGroups. The definition of our gang scheduling support is to wait for all the taskGroups get resources before scheduling them, not one of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (368300d) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.94%`.
   > The diff coverage is `76.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.73%   +1.94%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3129     +270     
   ==========================================
   + Hits         1652     1869     +217     
   - Misses       1138     1179      +41     
   - Partials       69       81      +12     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.27% <77.27%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.00% <85.00%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...ec73314](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-762492153


   > There is a test failure in the Travis, can you please check if it can be something related to the changes, or not?
   
   Hmm, yes, that was the basicSchedulingTest, it seems to work fine in the last run.
   it might be an intermittent error, let's see what happens in the next run.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559813903



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients *client.Clients
+	// when the placeholder manager is unable to delete a pod,
+	// this pod becomes to be an "orphan" pod. We add them to a map
+	// and keep retrying deleting them in order to avoid wasting resources.
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	mgr.Lock()
+	defer mgr.Unlock()
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {

Review comment:
       Good catch, fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558549411



##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:

Review comment:
       Make sense, let's track with a separate JIRA: https://issues.apache.org/jira/browse/YUNIKORN-509.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-763343478


   hi @kingamarton I don't think the e2e test was failed because of this PR. There seems to be some intermittent issue happening on the latest codebase. I've observed similar failures at https://github.com/apache/incubator-yunikorn-k8shim/pull/220 (purely e2e test on latest codebase) and https://github.com/apache/incubator-yunikorn-k8shim/pull/221. I suggest getting this one merged and work on fixing the intermittent issue in a separate JIRA, as that may take some time. Does that make sense? Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558527947



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       I am not quite sure now. @HuangTing-Yao , could you please explain why we need to sleep 3 seconds here?
   We can track this as a follow-up task. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558525701



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",

Review comment:
       What I can imagine is like:
   1. Network issue between the client and the API-server, e.g connection issue
   2. Server-side request throttling
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559813380



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients *client.Clients
+	// when the placeholder manager is unable to delete a pod,
+	// this pod becomes to be an "orphan" pod. We add them to a map
+	// and keep retrying deleting them in order to avoid wasting resources.
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	mgr.Lock()
+	defer mgr.Unlock()
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})

Review comment:
       Agree, fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (368300d) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `2.29%`.
   > The diff coverage is `76.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.43%   59.73%   +2.29%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3129     +270     
   ==========================================
   + Hits         1642     1869     +227     
   - Misses       1147     1179      +32     
   - Partials       70       81      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.27% <77.27%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+7.42%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.00% <85.00%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [6 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...368300d](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei merged pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei merged pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r557336617



##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:

Review comment:
       Please export this part in a yaml file or i would be even better to reuse the gang-coordinator.yaml file

##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:
+        - containerPort: 8863
+EOF)
+# wait for web server to be running
+until grep 'Running' <(kubectl get pod gangweb -o=jsonpath='{.status.phase}'); do
+  sleep 1
+done

Review comment:
       I think it might worth to add a timeout here to avoid infinite loop in case the pod will not transit to running state for any reason

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)

Review comment:
       Does it mean that in case we fail to create the placeholder, the app will bee handled as a normal app not as one with a gang, right?

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent

Review comment:
       Shouldn't we check if we are still in the right state?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       Why we need tis sleep here?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod

Review comment:
       what is an orphan pod? I think it worths to add a comment explaining what is this

##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:
+        - containerPort: 8863
+EOF)
+# wait for web server to be running
+until grep 'Running' <(kubectl get pod gangweb -o=jsonpath='{.status.phase}'); do
+  sleep 1
+done
+# create gang jobs
+for i in $(seq "$JOBAMOUNT"); do
+  kubectl create -f <(cat << EOF
+apiVersion: batch/v1
+kind: Job
+metadata:
+  name: gang-job-$i
+  labels: 
+    app: gang
+    queue: root.sandbox
+spec:
+  completions: $GANGMEMBER
+  parallelism: $GANGMEMBER
+  template:
+    spec:
+      containers:
+      - name: gang
+        image: apache/yunikorn:simulation-gang-worker-latest
+        imagePullPolicy: Never
+        env:
+        - name: JOB_ID
+          value: gang-job-$i
+        - name: SERVICE_NAME
+          value: gangservice
+        - name: MEMBER_AMOUNT
+          value: "$GANGMEMBER"
+        - name: TASK_EXECUTION_SECONDS
+          value: "$RUNTIMESEC"
+      restartPolicy: Never
+      schedulerName: yunikorn

Review comment:
       Please extract this part in a yaml file. Or I think we can reuse here the gang-job.yaml file

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)
+			dispatcher.Dispatch(ev)
+		}
+	}()
+}
+
+func (app *Application) onReservationStateChange(event *fsm.Event) {
+	// this event is called when there is a add or release of placeholders
+	desireCounts := utils.NewTaskGroupInstanceCountMap()
+	for _, tg := range app.taskGroups {
+		desireCounts.Add(tg.Name, tg.MinMember)
+	}
+
+	actualCounts := utils.NewTaskGroupInstanceCountMap()
+	for _, t := range app.getTasks(events.States().Task.Bound) {
+		if t.placeholder {
+			actualCounts.AddOne(t.taskGroupName)
+		}
+	}
+
+	// min member all satisfied
+	if desireCounts.Equals(actualCounts) {
+		ev := NewRunApplicationEvent(app.applicationID)
+		dispatcher.Dispatch(ev)
+	}

Review comment:
       In case of multiple task groups defined, shouldn't we start thee app if one of the taskgroup min member is satisfied? Usually are there any links between the task groups, to worth waiting to all of them?

##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars
+	// total length no longer than 20 + 28 + 5 + 10 = 63
+	shortTaskGroupName := fmt.Sprintf("%.20s", taskGroupName)
+	shortAppID := fmt.Sprintf("%.28s", appID)
+	return "tg-" + shortTaskGroupName + "-" + shortAppID + fmt.Sprintf("-%d", index)
+}
+
+func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.ResourceList {
+	resourceReq := v1.ResourceList{}
+	for k, v := range resources {
+		resourceReq[v1.ResourceName(k)] = v
+	}
+	return resourceReq
+}
+
+func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
+	if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok {
+		if v, err := strconv.ParseBool(value); err == nil {
+			return v
+		}
+	}
+	return false
+}
+
+func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
+	if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok {
+		return value
+	}
+	return ""
+}
+
+func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]v1alpha1.TaskGroup, error) {
+	taskGroupInfo, ok := pod.Annotations[constants.AnnotationTaskGroups]
+	if !ok {
+		return nil, nil
+	}
+	taskGroups := []v1alpha1.TaskGroup{}
+	err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups)
+	if err != nil {
+		return nil, err
+	}
+	// json.Unmarchal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format.
+	for _, taskGroup := range taskGroups {
+		if taskGroup.Name == "" {
+			return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+		if taskGroup.MinMember == int32(0) {
+			return nil, fmt.Errorf("can't get taskGroup MinMember from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+	}
+	return taskGroups, nil
+}
+
+type TaskGroupInstanceCountMap struct {
+	counts map[string]int32
+	sync.RWMutex
+}
+
+func NewTaskGroupInstanceCountMap() *TaskGroupInstanceCountMap {
+	return &TaskGroupInstanceCountMap{
+		counts: make(map[string]int32),
+	}
+}
+
+func (t *TaskGroupInstanceCountMap) Add(taskGroupName string, num int32) {
+	t.update(taskGroupName, num)
+}
+
+func (t *TaskGroupInstanceCountMap) AddOne(taskGroupName string) {
+	t.update(taskGroupName, 1)
+}
+
+func (t *TaskGroupInstanceCountMap) DeleteOne(taskGroupName string) {
+	t.update(taskGroupName, -1)
+}
+
+func (t *TaskGroupInstanceCountMap) update(taskGroupName string, delta int32) {
+	t.Lock()
+	defer t.Unlock()
+	if v, ok := t.counts[taskGroupName]; ok {
+		t.counts[taskGroupName] = v + delta
+	} else {
+		t.counts[taskGroupName] = delta
+	}
+}
+
+func (t *TaskGroupInstanceCountMap) Size() int {
+	t.RLock()
+	defer t.RUnlock()
+	return len(t.counts)
+}
+
+func (t *TaskGroupInstanceCountMap) GetTaskGroupInstanceCount(groupName string) int32 {
+	t.RLock()
+	defer t.RUnlock()
+	return t.counts[groupName]
+}
+
+func (t *TaskGroupInstanceCountMap) Equals(target *TaskGroupInstanceCountMap) bool {
+	t.RLock()
+	defer t.RUnlock()
+
+	if target == nil {
+		return false
+	}

Review comment:
       I think we need to add the cases when t is nil and when both t and target is nil. 

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",

Review comment:
       When can deleting a pod fail?

##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:

Review comment:
       Why is the Accepted part removed? Every application will go through the reserving state even if it has no gang defined?

##########
File path: pkg/cache/placeholder.go
##########
@@ -0,0 +1,79 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"fmt"
+
+	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+)
+
+type Placeholder struct {
+	appID         string
+	taskGroupName string
+	pod           *v1.Pod
+}
+
+func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1.TaskGroup) *Placeholder {
+	placeholderPod := &v1.Pod{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      placeholderName,
+			Namespace: app.tags[constants.AppTagNamespace],
+			Labels: map[string]string{
+				constants.LabelApplicationID: app.GetApplicationID(),
+				constants.LabelQueueName:     app.GetQueue(),
+			},
+			Annotations: map[string]string{
+				constants.AnnotationPlaceholderFlag: "true",
+				constants.AnnotationTaskGroupName:   taskGroup.Name,
+			},

Review comment:
       What happens if the real pod has more labels and annotations that just this ones? We will not need them?

##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:
+		app.scheduleTasks(func(t *Task) bool {
+			return t.placeholder
+		})
 	case states.Running:
-		if len(app.GetNewTasks()) > 0 {
-			for _, task := range app.GetNewTasks() {
-				// for each new task, we do a sanity check before moving the state to Pending_Schedule
-				if err := task.sanityCheckBeforeScheduling(); err == nil {
-					// note, if we directly trigger submit task event, it may spawn too many duplicate
-					// events, because a task might be submitted multiple times before its state transits to PENDING.
-					if handleErr := task.handle(
-						NewSimpleTaskEvent(task.applicationID, task.taskID, events.InitTask)); handleErr != nil {
-						// something goes wrong when transit task to PENDING state,
-						// this should not happen because we already checked the state
-						// before calling the transition. Nowhere to go, just log the error.
-						log.Logger().Warn("init task failed", zap.Error(err))
-					}
-				} else {
-					events.GetRecorder().Event(task.GetTaskPod(), v1.EventTypeWarning, "FailedScheduling", err.Error())
-					log.Logger().Debug("task is not ready for scheduling",
-						zap.String("appID", task.applicationID),
-						zap.String("taskID", task.taskID),
-						zap.Error(err))
-				}
-			}
-		}
+		app.scheduleTasks(func(t *Task) bool {
+			return !t.placeholder
+		})
 	default:
 		log.Logger().Debug("skipping scheduling application",
 			zap.String("appID", app.GetApplicationID()),
 			zap.String("appState", app.GetApplicationState()))
 	}
 }
 
+func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
+	for _, task := range app.GetNewTasks() {
+		if taskScheduleCondition(task) {

Review comment:
       ```
   func (app *Application) scheduleTasks(bool schedulePlaceholders) {
   ...
   if(t.placeholdeer == schedulePlaceholders)
   ...
   }
   ```
   would be much simpler than passing a function

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",

Review comment:
       Wouldn't we need a lock here?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")

Review comment:
       Shouldn't we initiate the placeholder manager if it is not initiated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (d0a5e7b) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `1.94%`.
   > The diff coverage is `76.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.72%   +1.94%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3136     +277     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1181      +43     
   - Partials       69       82      +13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `81.53% <81.53%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...d0a5e7b](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (ec73314) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `2.00%`.
   > The diff coverage is `76.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.78%   +2.00%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3133     +274     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1179      +41     
   - Partials       69       81      +12     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.48% <85.48%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...ec73314](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558490977



##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:

Review comment:
       Good question. The Accepted part is moved to
   
   ```
   func (app *Application) postAppAccepted(event *fsm.Event) {
   	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
   	var ev events.SchedulingEvent
   	if len(app.taskGroups) != 0 {
   		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
   		dispatcher.Dispatch(ev)
   	} else {
   		ev = NewRunApplicationEvent(app.applicationID)
   	}
   	dispatcher.Dispatch(ev)
   }
   ```
   
   it checks if an app has taskGroups, if not, directly transit the app to Running state; otherwise it will be going to the Reserving state. This is due to the state machine graph change, see more [this doc](https://docs.google.com/document/d/1P-g4plXIJ9Xybp-jyKySI18P3rkGQPuTutGYhv1LaQ8/edit#heading=h.laq9umr6wnyj).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559606848



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")

Review comment:
       Yes, it make sense. Thank you for the explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] HuangTing-Yao commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
HuangTing-Yao commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558814340



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       When we send the `struct{}{}` to  `stopChan`, the `Start()` might not set Running to false immediately.
   Or we can move sleep to `TestPlaceholderManagerStartStop()`, which is located in `placeholder_manager_test.go`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559607480



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       I agree, that it should be moved to the test code. Please open a follow up Jira for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760020178


   hi @kingamarton  please help to review. Since all the patches have been previously reviewed before committing to the YUNIKORN-2 branch. I think we can do a light review for the merge. Thanks. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559807856



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       Thanks, created https://issues.apache.org/jira/browse/YUNIKORN-510 to track this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558526538



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",

Review comment:
       Good catch. We need a lock here, adding in the next commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559088192



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       Yeah, sounds like this should be done in the test code instead of here. @HuangTing-Yao , it will be great we can fix this as a follow-up. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558497893



##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)

Review comment:
       Correct. This is just in case we get some failures while creating placeholders. In most cases, this should not happen.
   When this happens, today, we do not have a good understanding of what should be the best way to handle such failures. So we take a simple and safe solution, aka release all the placeholders and try to schedule the app as a normal app. We can think about how to improve this when we know what is the best optimization. 

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)

Review comment:
       Correct. This is just in case we get some failures while creating placeholders. In most cases, this should not happen. When this happens, today, we do not have a good understanding of what should be the best way to handle such failures. So we take a simple and safe solution, aka release all the placeholders and try to schedule the app as a normal app. We can think about how to improve this when we know what is the best optimization. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] codecov[bot] edited a comment on pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#issuecomment-760395169


   # [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=h1) Report
   > Merging [#219](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=desc) (ec73314) into [master](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/commit/eb888dfcf3b74ce248f2ed3b92ae5bddaf6caf5a?el=desc) (eb888df) will **increase** coverage by `2.00%`.
   > The diff coverage is `76.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/graphs/tree.svg?width=650&height=150&src=pr&token=LZImIuvleR)](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #219      +/-   ##
   ==========================================
   + Coverage   57.78%   59.78%   +2.00%     
   ==========================================
     Files          32       35       +3     
     Lines        2859     3133     +274     
   ==========================================
   + Hits         1652     1873     +221     
   - Misses       1138     1179      +41     
   - Partials       69       81      +12     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [pkg/cache/application\_events.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uX2V2ZW50cy5nbw==) | `53.06% <0.00%> (-7.41%)` | :arrow_down: |
   | [pkg/common/events/states.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9ldmVudHMvc3RhdGVzLmdv) | `0.00% <0.00%> (ø)` | |
   | [pkg/common/utils/utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy91dGlscy5nbw==) | `25.00% <ø> (ø)` | |
   | [pkg/cache/application.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2FwcGxpY2F0aW9uLmdv) | `76.74% <59.25%> (-2.31%)` | :arrow_down: |
   | [pkg/cache/placeholder\_manager.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3BsYWNlaG9sZGVyX21hbmFnZXIuZ28=) | `77.94% <77.94%> (ø)` | |
   | [pkg/cache/task.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL3Rhc2suZ28=) | `74.40% <82.22%> (+2.77%)` | :arrow_up: |
   | [pkg/appmgmt/general/general.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2FwcG1nbXQvZ2VuZXJhbC9nZW5lcmFsLmdv) | `59.47% <83.33%> (+2.03%)` | :arrow_up: |
   | [pkg/common/utils/gang\_utils.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi91dGlscy9nYW5nX3V0aWxzLmdv) | `85.48% <85.48%> (ø)` | |
   | [pkg/common/si\_helper.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NvbW1vbi9zaV9oZWxwZXIuZ28=) | `63.15% <90.00%> (+1.36%)` | :arrow_up: |
   | [pkg/cache/context.go](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree#diff-cGtnL2NhY2hlL2NvbnRleHQuZ28=) | `41.08% <100.00%> (+0.13%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=footer). Last update [eb888df...d0a5e7b](https://codecov.io/gh/apache/incubator-yunikorn-k8shim/pull/219?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r559814302



##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,183 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars

Review comment:
       Good catch, the app ID is shorter than 28. Fixed the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r560378954



##########
File path: pkg/cache/application.go
##########
@@ -81,8 +87,14 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched
 			{Name: string(events.AcceptApplication),
 				Src: []string{states.Submitted, states.Recovering},
 				Dst: states.Accepted},
+			{Name: string(events.TryReserve),
+				Src: []string{states.Accepted},
+				Dst: states.Reserving},
+			{Name: string(events.UpdateReservation),
+				Src: []string{states.Reserving},
+				Dst: states.Reserving},

Review comment:
       hi @kingamarton  pls take a look at the SM definition here:
   
   ```
   fsm.Callbacks{
     ...
     string(events.UpdateReservation):      app.onReservationStateChange,
     ...
   },
   ```
   
   this is called every time when a task is bound to a node, we check if the app gets enough placeholders; when it gets enough, in `onReservationStateChange()` it will trigger a state transition from `Reserving` to `Running`.
   
   ```
   func (app *Application) onReservationStateChange(event *fsm.Event) {
            ...
   	if desireCounts.Equals(actualCounts) {
   		ev := NewRunApplicationEvent(app.applicationID)
   		dispatcher.Dispatch(ev)
   	}
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558531380



##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars
+	// total length no longer than 20 + 28 + 5 + 10 = 63
+	shortTaskGroupName := fmt.Sprintf("%.20s", taskGroupName)
+	shortAppID := fmt.Sprintf("%.28s", appID)
+	return "tg-" + shortTaskGroupName + "-" + shortAppID + fmt.Sprintf("-%d", index)
+}
+
+func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.ResourceList {
+	resourceReq := v1.ResourceList{}
+	for k, v := range resources {
+		resourceReq[v1.ResourceName(k)] = v
+	}
+	return resourceReq
+}
+
+func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
+	if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok {
+		if v, err := strconv.ParseBool(value); err == nil {
+			return v
+		}
+	}
+	return false
+}
+
+func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
+	if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok {
+		return value
+	}
+	return ""
+}
+
+func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]v1alpha1.TaskGroup, error) {
+	taskGroupInfo, ok := pod.Annotations[constants.AnnotationTaskGroups]
+	if !ok {
+		return nil, nil
+	}
+	taskGroups := []v1alpha1.TaskGroup{}
+	err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups)
+	if err != nil {
+		return nil, err
+	}
+	// json.Unmarchal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format.
+	for _, taskGroup := range taskGroups {
+		if taskGroup.Name == "" {
+			return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+		if taskGroup.MinMember == int32(0) {
+			return nil, fmt.Errorf("can't get taskGroup MinMember from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+	}
+	return taskGroups, nil
+}
+
+type TaskGroupInstanceCountMap struct {
+	counts map[string]int32
+	sync.RWMutex
+}
+
+func NewTaskGroupInstanceCountMap() *TaskGroupInstanceCountMap {
+	return &TaskGroupInstanceCountMap{
+		counts: make(map[string]int32),
+	}
+}
+
+func (t *TaskGroupInstanceCountMap) Add(taskGroupName string, num int32) {
+	t.update(taskGroupName, num)
+}
+
+func (t *TaskGroupInstanceCountMap) AddOne(taskGroupName string) {
+	t.update(taskGroupName, 1)
+}
+
+func (t *TaskGroupInstanceCountMap) DeleteOne(taskGroupName string) {
+	t.update(taskGroupName, -1)
+}
+
+func (t *TaskGroupInstanceCountMap) update(taskGroupName string, delta int32) {
+	t.Lock()
+	defer t.Unlock()
+	if v, ok := t.counts[taskGroupName]; ok {
+		t.counts[taskGroupName] = v + delta
+	} else {
+		t.counts[taskGroupName] = delta
+	}
+}
+
+func (t *TaskGroupInstanceCountMap) Size() int {
+	t.RLock()
+	defer t.RUnlock()
+	return len(t.counts)
+}
+
+func (t *TaskGroupInstanceCountMap) GetTaskGroupInstanceCount(groupName string) int32 {
+	t.RLock()
+	defer t.RUnlock()
+	return t.counts[groupName]
+}
+
+func (t *TaskGroupInstanceCountMap) Equals(target *TaskGroupInstanceCountMap) bool {
+	t.RLock()
+	defer t.RUnlock()
+
+	if target == nil {
+		return false
+	}

Review comment:
       Good suggestion. I just realize this is something different in golang
   https://tour.golang.org/methods/12
   I will add the check and the UT for this in the next commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-yunikorn-k8shim] yangwwei commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

Posted by GitBox <gi...@apache.org>.
yangwwei commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r558521489



##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod

Review comment:
       Make sense, I will add some more comments on this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org