You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2021/01/15 11:09:18 UTC

[GitHub] [incubator-yunikorn-k8shim] kingamarton commented on a change in pull request #219: [YUNIKORN-2] gang scheduling shim side implementation

kingamarton commented on a change in pull request #219:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/219#discussion_r557336617



##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:

Review comment:
       Please export this part in a yaml file or i would be even better to reuse the gang-coordinator.yaml file

##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:
+        - containerPort: 8863
+EOF)
+# wait for web server to be running
+until grep 'Running' <(kubectl get pod gangweb -o=jsonpath='{.status.phase}'); do
+  sleep 1
+done

Review comment:
       I think it might worth to add a timeout here to avoid infinite loop in case the pod will not transit to running state for any reason

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)

Review comment:
       Does it mean that in case we fail to create the placeholder, the app will bee handled as a normal app not as one with a gang, right?

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent

Review comment:
       Shouldn't we check if we are still in the right state?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",
+					zap.Error(err))
+				mgr.orphanPod[taskID] = task.pod
+			}
+		}
+	}
+	log.Logger().Info("finish to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+}
+
+func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
+	mgr.Lock()
+	defer mgr.Unlock()
+	for taskID, pod := range mgr.orphanPod {
+		log.Logger().Debug("start to clean up orphan pod",
+			zap.String("taskID", taskID),
+			zap.String("podName", pod.Name))
+		err := mgr.clients.KubeClient.Delete(pod)
+		if err != nil {
+			log.Logger().Warn("failed to clean up orphan pod", zap.Error(err))
+		} else {
+			delete(mgr.orphanPod, taskID)
+		}
+	}
+}
+
+func (mgr *PlaceholderManager) Start() {
+	log.Logger().Info("starting the Placeholder Manager")
+	mgr.stopChan = make(chan struct{})
+	if mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been started")
+		return
+	}
+	mgr.setRunning(true)
+	go func() {
+		for {
+			select {
+			case <-mgr.stopChan:
+				log.Logger().Info("PlaceholderManager has been stopped")
+				mgr.setRunning(false)
+				return
+			default:
+				// clean orphan placeholders every 5 seconds
+				log.Logger().Info("clean up orphan pod")
+				mgr.cleanOrphanPlaceholders()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+}
+
+func (mgr *PlaceholderManager) Stop() {
+	if !mgr.isRunning() {
+		log.Logger().Info("The placeholder manager has been stopped")
+		return
+	}
+	log.Logger().Info("stopping the Placeholder Manager")
+	mgr.stopChan <- struct{}{}
+	time.Sleep(3 * time.Second)

Review comment:
       Why we need tis sleep here?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod

Review comment:
       what is an orphan pod? I think it worths to add a comment explaining what is this

##########
File path: deployments/examples/gang/cmd/gangDeploy.sh
##########
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+#limitations under the License.
+#
+
+# gangDeploy.sh <job amount> <pod amount> <gang member> <task run time(sec)>
+set -o errexit
+set -o nounset
+set -o pipefail
+
+JOBAMOUNT=$1
+GANGMEMBER=$2
+RUNTIMESEC=$3
+
+# create service
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Service
+metadata:
+  name: gangservice
+  labels:
+    app: gang
+spec:
+  selector:
+    app: gang
+  type: ClusterIP
+  ports:
+  - protocol: TCP
+    port: 8863
+    targetPort: 8863
+EOF) 
+# create job counter web server
+kubectl create -f <(cat << EOF
+apiVersion: v1
+kind: Pod
+metadata:
+  name: gangweb
+  labels:
+    app: gang
+    queue: root.sandbox
+spec:
+  schedulerName: yunikorn
+  containers:
+    - name: gangweb
+      image: apache/yunikorn:simulation-gang-coordinator-latest
+      imagePullPolicy: Never
+      ports:
+        - containerPort: 8863
+EOF)
+# wait for web server to be running
+until grep 'Running' <(kubectl get pod gangweb -o=jsonpath='{.status.phase}'); do
+  sleep 1
+done
+# create gang jobs
+for i in $(seq "$JOBAMOUNT"); do
+  kubectl create -f <(cat << EOF
+apiVersion: batch/v1
+kind: Job
+metadata:
+  name: gang-job-$i
+  labels: 
+    app: gang
+    queue: root.sandbox
+spec:
+  completions: $GANGMEMBER
+  parallelism: $GANGMEMBER
+  template:
+    spec:
+      containers:
+      - name: gang
+        image: apache/yunikorn:simulation-gang-worker-latest
+        imagePullPolicy: Never
+        env:
+        - name: JOB_ID
+          value: gang-job-$i
+        - name: SERVICE_NAME
+          value: gangservice
+        - name: MEMBER_AMOUNT
+          value: "$GANGMEMBER"
+        - name: TASK_EXECUTION_SECONDS
+          value: "$RUNTIMESEC"
+      restartPolicy: Never
+      schedulerName: yunikorn

Review comment:
       Please extract this part in a yaml file. Or I think we can reuse here the gang-job.yaml file

##########
File path: pkg/cache/application.go
##########
@@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) {
 	}
 }
 
+func (app *Application) postAppAccepted(event *fsm.Event) {
+	// if app has taskGroups defined, it goes to the Reserving state before getting to Running
+	var ev events.SchedulingEvent
+	if len(app.taskGroups) != 0 {
+		ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve)
+		dispatcher.Dispatch(ev)
+	} else {
+		ev = NewRunApplicationEvent(app.applicationID)
+	}
+	dispatcher.Dispatch(ev)
+}
+
+func (app *Application) onReserving(event *fsm.Event) {
+	go func() {
+		// while doing reserving
+		if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
+			// creating placeholder failed
+			// put the app into recycling queue and turn the app to running state
+			GetPlaceholderManager().CleanUp(app)
+			ev := NewRunApplicationEvent(app.applicationID)
+			dispatcher.Dispatch(ev)
+		}
+	}()
+}
+
+func (app *Application) onReservationStateChange(event *fsm.Event) {
+	// this event is called when there is a add or release of placeholders
+	desireCounts := utils.NewTaskGroupInstanceCountMap()
+	for _, tg := range app.taskGroups {
+		desireCounts.Add(tg.Name, tg.MinMember)
+	}
+
+	actualCounts := utils.NewTaskGroupInstanceCountMap()
+	for _, t := range app.getTasks(events.States().Task.Bound) {
+		if t.placeholder {
+			actualCounts.AddOne(t.taskGroupName)
+		}
+	}
+
+	// min member all satisfied
+	if desireCounts.Equals(actualCounts) {
+		ev := NewRunApplicationEvent(app.applicationID)
+		dispatcher.Dispatch(ev)
+	}

Review comment:
       In case of multiple task groups defined, shouldn't we start thee app if one of the taskgroup min member is satisfied? Usually are there any links between the task groups, to worth waiting to all of them?

##########
File path: pkg/common/utils/gang_utils.go
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package utils
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+)
+
+func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) {
+	if groupName == "" {
+		// task has no group defined
+		return nil, nil
+	}
+
+	// app has no taskGroups associated
+	if len(appTaskGroups) == 0 {
+		return nil, nil
+	}
+
+	// task group defined in app, return the corresponding taskGroup
+	for _, tg := range appTaskGroups {
+		if tg.Name == groupName {
+			return tg, nil
+		}
+	}
+
+	// task group name specified, but could not find a mapping value in app taskGroups
+	return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName)
+}
+
+// the placeholder name is the pod name, pod name can not be longer than 63 chars
+// taskGroup name and appID will be truncated if they go over 20/28 chars respectively
+func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
+	// taskGroup name no longer than 20 chars
+	// appID no longer than 35 chars
+	// total length no longer than 20 + 28 + 5 + 10 = 63
+	shortTaskGroupName := fmt.Sprintf("%.20s", taskGroupName)
+	shortAppID := fmt.Sprintf("%.28s", appID)
+	return "tg-" + shortTaskGroupName + "-" + shortAppID + fmt.Sprintf("-%d", index)
+}
+
+func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.ResourceList {
+	resourceReq := v1.ResourceList{}
+	for k, v := range resources {
+		resourceReq[v1.ResourceName(k)] = v
+	}
+	return resourceReq
+}
+
+func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
+	if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok {
+		if v, err := strconv.ParseBool(value); err == nil {
+			return v
+		}
+	}
+	return false
+}
+
+func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
+	if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok {
+		return value
+	}
+	return ""
+}
+
+func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]v1alpha1.TaskGroup, error) {
+	taskGroupInfo, ok := pod.Annotations[constants.AnnotationTaskGroups]
+	if !ok {
+		return nil, nil
+	}
+	taskGroups := []v1alpha1.TaskGroup{}
+	err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups)
+	if err != nil {
+		return nil, err
+	}
+	// json.Unmarchal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format.
+	for _, taskGroup := range taskGroups {
+		if taskGroup.Name == "" {
+			return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+		if taskGroup.MinMember == int32(0) {
+			return nil, fmt.Errorf("can't get taskGroup MinMember from pod annotation, %s",
+				pod.Annotations[constants.AnnotationTaskGroups])
+		}
+	}
+	return taskGroups, nil
+}
+
+type TaskGroupInstanceCountMap struct {
+	counts map[string]int32
+	sync.RWMutex
+}
+
+func NewTaskGroupInstanceCountMap() *TaskGroupInstanceCountMap {
+	return &TaskGroupInstanceCountMap{
+		counts: make(map[string]int32),
+	}
+}
+
+func (t *TaskGroupInstanceCountMap) Add(taskGroupName string, num int32) {
+	t.update(taskGroupName, num)
+}
+
+func (t *TaskGroupInstanceCountMap) AddOne(taskGroupName string) {
+	t.update(taskGroupName, 1)
+}
+
+func (t *TaskGroupInstanceCountMap) DeleteOne(taskGroupName string) {
+	t.update(taskGroupName, -1)
+}
+
+func (t *TaskGroupInstanceCountMap) update(taskGroupName string, delta int32) {
+	t.Lock()
+	defer t.Unlock()
+	if v, ok := t.counts[taskGroupName]; ok {
+		t.counts[taskGroupName] = v + delta
+	} else {
+		t.counts[taskGroupName] = delta
+	}
+}
+
+func (t *TaskGroupInstanceCountMap) Size() int {
+	t.RLock()
+	defer t.RUnlock()
+	return len(t.counts)
+}
+
+func (t *TaskGroupInstanceCountMap) GetTaskGroupInstanceCount(groupName string) int32 {
+	t.RLock()
+	defer t.RUnlock()
+	return t.counts[groupName]
+}
+
+func (t *TaskGroupInstanceCountMap) Equals(target *TaskGroupInstanceCountMap) bool {
+	t.RLock()
+	defer t.RUnlock()
+
+	if target == nil {
+		return false
+	}

Review comment:
       I think we need to add the cases when t is nil and when both t and target is nil. 

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",
+		zap.String("appID", app.GetApplicationID()))
+	for taskID, task := range app.taskMap {
+		if task.GetTaskPlaceholder() {
+			// remove pod
+			err := mgr.clients.KubeClient.Delete(task.pod)
+			if err != nil {
+				log.Logger().Error("failed to clean up placeholder pod",

Review comment:
       When can deleting a pod fail?

##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:

Review comment:
       Why is the Accepted part removed? Every application will go through the reserving state even if it has no gang defined?

##########
File path: pkg/cache/placeholder.go
##########
@@ -0,0 +1,79 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"fmt"
+
+	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+)
+
+type Placeholder struct {
+	appID         string
+	taskGroupName string
+	pod           *v1.Pod
+}
+
+func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1.TaskGroup) *Placeholder {
+	placeholderPod := &v1.Pod{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      placeholderName,
+			Namespace: app.tags[constants.AppTagNamespace],
+			Labels: map[string]string{
+				constants.LabelApplicationID: app.GetApplicationID(),
+				constants.LabelQueueName:     app.GetQueue(),
+			},
+			Annotations: map[string]string{
+				constants.AnnotationPlaceholderFlag: "true",
+				constants.AnnotationTaskGroupName:   taskGroup.Name,
+			},

Review comment:
       What happens if the real pod has more labels and annotations that just this ones? We will not need them?

##########
File path: pkg/cache/application.go
##########
@@ -273,42 +312,46 @@ func (app *Application) Schedule() {
 			log.Logger().Warn("failed to handle SUBMIT app event",
 				zap.Error(err))
 		}
-	case states.Accepted:
-		ev := NewRunApplicationEvent(app.GetApplicationID())
-		if err := app.handle(ev); err != nil {
-			log.Logger().Warn("failed to handle RUN app event",
-				zap.Error(err))
-		}
+	case states.Reserving:
+		app.scheduleTasks(func(t *Task) bool {
+			return t.placeholder
+		})
 	case states.Running:
-		if len(app.GetNewTasks()) > 0 {
-			for _, task := range app.GetNewTasks() {
-				// for each new task, we do a sanity check before moving the state to Pending_Schedule
-				if err := task.sanityCheckBeforeScheduling(); err == nil {
-					// note, if we directly trigger submit task event, it may spawn too many duplicate
-					// events, because a task might be submitted multiple times before its state transits to PENDING.
-					if handleErr := task.handle(
-						NewSimpleTaskEvent(task.applicationID, task.taskID, events.InitTask)); handleErr != nil {
-						// something goes wrong when transit task to PENDING state,
-						// this should not happen because we already checked the state
-						// before calling the transition. Nowhere to go, just log the error.
-						log.Logger().Warn("init task failed", zap.Error(err))
-					}
-				} else {
-					events.GetRecorder().Event(task.GetTaskPod(), v1.EventTypeWarning, "FailedScheduling", err.Error())
-					log.Logger().Debug("task is not ready for scheduling",
-						zap.String("appID", task.applicationID),
-						zap.String("taskID", task.taskID),
-						zap.Error(err))
-				}
-			}
-		}
+		app.scheduleTasks(func(t *Task) bool {
+			return !t.placeholder
+		})
 	default:
 		log.Logger().Debug("skipping scheduling application",
 			zap.String("appID", app.GetApplicationID()),
 			zap.String("appState", app.GetApplicationState()))
 	}
 }
 
+func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
+	for _, task := range app.GetNewTasks() {
+		if taskScheduleCondition(task) {

Review comment:
       ```
   func (app *Application) scheduleTasks(bool schedulePlaceholders) {
   ...
   if(t.placeholdeer == schedulePlaceholders)
   ...
   }
   ```
   would be much simpler than passing a function

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")
+		}
+	})
+	return placeholderMgr
+}
+
+func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
+	mgr.Lock()
+	defer mgr.Unlock()
+
+	// iterate all task groups, create placeholders for all the min members
+	for _, tg := range app.getTaskGroups() {
+		for i := int32(0); i < tg.MinMember; i++ {
+			placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
+			placeholder := newPlaceholder(placeholderName, app, tg)
+			// create the placeholder on K8s
+			_, err := mgr.clients.KubeClient.Create(placeholder.pod)
+			if err != nil {
+				// if failed to create the place holder pod
+				// caller should handle this error
+				log.Logger().Error("failed to create placeholder pod",
+					zap.Error(err))
+				return err
+			}
+			log.Logger().Info("placeholder created",
+				zap.String("placeholder", placeholder.String()))
+		}
+	}
+
+	return nil
+}
+
+// clean up all the placeholders for an application
+func (mgr *PlaceholderManager) CleanUp(app *Application) {
+	log.Logger().Info("start to clean up app placeholders",

Review comment:
       Wouldn't we need a lock here?

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -0,0 +1,167 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+	v1 "k8s.io/api/core/v1"
+
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
+	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
+)
+
+// placeholder manager is a service to manage the lifecycle of app placeholders
+type PlaceholderManager struct {
+	clients   *client.Clients
+	orphanPod map[string]*v1.Pod
+	stopChan  chan struct{}
+	running   atomic.Value
+	sync.RWMutex
+}
+
+var placeholderMgr *PlaceholderManager
+var once sync.Once
+
+func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
+	var r atomic.Value
+	r.Store(false)
+	placeholderMgr = &PlaceholderManager{
+		clients: clients,
+		running: r,
+	}
+	return placeholderMgr
+}
+
+func GetPlaceholderManager() *PlaceholderManager {
+	once.Do(func() {
+		if placeholderMgr == nil {
+			log.Logger().Fatal("PlaceholderManager is not initiated")

Review comment:
       Shouldn't we initiate the placeholder manager if it is not initiated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org