You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2021/04/14 07:49:56 UTC

[GitHub] [incubator-yunikorn-k8shim] wilfred-s commented on a change in pull request #253: [YUNIKORN-558] Integration with Spark K8s Operator

wilfred-s commented on a change in pull request #253:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/253#discussion_r612945681



##########
File path: go.mod
##########
@@ -18,20 +18,20 @@
 
 module github.com/apache/incubator-yunikorn-k8shim
 
-go 1.12
+go 1.14
 
 require (
-	github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200817155620-c19d2b8660d8
-	github.com/apache/incubator-yunikorn-core v0.0.0-20210308173435-79a60272f12f
-	github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20210226143918-19a5cca5e428
+	github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20201215015655-2e8b733f5ad0
+	github.com/apache/incubator-yunikorn-core v0.0.0-20210412120202-73fdc11674a5
+	github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20210412034924-44e8abeff79e
 	github.com/google/uuid v1.1.1
 	github.com/gorilla/mux v1.7.3
 	github.com/looplab/fsm v0.1.0
-	github.com/onsi/ginkgo v1.11.0
-	github.com/onsi/gomega v1.7.0
-	go.uber.org/zap v1.13.0
+	github.com/onsi/ginkgo v1.15.1
+	github.com/onsi/gomega v1.11.0
+	go.uber.org/zap v1.16.0
 	golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
-	gopkg.in/yaml.v2 v2.2.8
+	gopkg.in/yaml.v2 v2.4.0

Review comment:
       We should not just move dependencies unless we require these changes to be made for the code to work.
   I can understand the spark operator update but we should leave the rest in this change

##########
File path: deployments/image/configmap/Dockerfile
##########
@@ -21,7 +21,7 @@ FROM alpine:latest
 RUN apk add curl
 RUN apk add jq
 RUN apk add --update openssl
-RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.15.12/bin/linux/amd64/kubectl
+RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.16/bin/linux/amd64/kubectl

Review comment:
       We need to be careful with this change. It might break 1.15 support which we have not removed yet.
   Please roll back.

##########
File path: go.mod
##########
@@ -18,20 +18,20 @@
 
 module github.com/apache/incubator-yunikorn-k8shim
 
-go 1.12
+go 1.14

Review comment:
       Changing to `go 1.14` means we require at least that version to compile. The makefile checks this. We need to announce that before we push this and make sure we have fully tested it.
   Moving the version has side effects, see my comments in [YUNIKORN-397](https://issues.apache.org/jira/browse/YUNIKORN-397)

##########
File path: pkg/cache/application.go
##########
@@ -460,9 +461,14 @@ func (app *Application) postAppAccepted() {
 	// app could have allocated tasks upon a recovery, and in that case,
 	// the reserving phase has already passed, no need to trigger that again.
 	var ev events.SchedulingEvent
+	log.Logger().Info("postAppAccepted",

Review comment:
       Please clarify that this is from a cached app while post processing or something like that.

##########
File path: .gitignore
##########
@@ -1,3 +1,4 @@
 .idea
 _output/
+vendor

Review comment:
       Nothing generates or should generate a vendor directory in the build. Why do we need this?

##########
File path: pkg/appmgmt/general/general.go
##########
@@ -34,6 +33,8 @@ import (
 	"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
 	"github.com/apache/incubator-yunikorn-k8shim/pkg/log"
 	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+
+	"go.uber.org/zap"

Review comment:
       Not sure why we need this change...

##########
File path: pkg/appmgmt/appmgmt.go
##########
@@ -47,7 +47,10 @@ func NewAMService(amProtocol interfaces.ApplicationManagementProtocol,
 		managers:    make([]interfaces.AppManager, 0),
 	}
 
+	log.Logger().Info("Initializing new AppMgmt service")
+
 	if !apiProvider.IsTestingMode() {
+		log.Logger().Info("Registered Spark operator with the AppMgmt service")

Review comment:
       Looking at the register call below we do more than the Spark operator.
   NIT: until we return from the register call the work has not been done yet (registered vs registering).

##########
File path: pkg/appmgmt/sparkoperator/spark.go
##########
@@ -51,6 +40,7 @@ type Manager struct {
 }
 
 func NewManager(amProtocol interfaces.ApplicationManagementProtocol, apiProvider client.APIProvider) *Manager {
+	log.Logger().Info("New AppMgr initialized")

Review comment:
       Isn't the app manager initialised in `ServiceInit()` below?
   You log almost the same message in that call.

##########
File path: pkg/appmgmt/general/general.go
##########
@@ -301,12 +311,14 @@ func (os *Manager) ListApplications() (map[string]interfaces.ApplicationMetadata
 
 	// get existing apps
 	existingApps := make(map[string]interfaces.ApplicationMetadata)
-	if appPods != nil && len(appPods) > 0 {
+	if len(appPods) > 0 {

Review comment:
       range can handle a nil input so there is no need to check at all.

##########
File path: pkg/appmgmt/sparkoperator/spark.go
##########
@@ -105,125 +87,26 @@ func (os *Manager) Stop() {
 	os.stopCh <- struct{}{}
 }
 
-func (os *Manager) getTaskMetadata(pod *v1.Pod) (interfaces.TaskMetadata, bool) {
-	// spark executors are having a common label
-	if appName, ok := pod.Labels[SparkAppNameLabel]; ok {
-		return interfaces.TaskMetadata{
-			ApplicationID: appName,
-			TaskID:        string(pod.UID),
-			Pod:           pod,
-		}, true
-	}
-	return interfaces.TaskMetadata{}, false
-}
-
-func (os *Manager) getAppMetadata(sparkApp *v1beta2.SparkApplication) interfaces.ApplicationMetadata {
-	// extract tags from annotations
-	tags := make(map[string]string)
-	for annotationKey, annotationValue := range sparkApp.GetAnnotations() {
-		tags[annotationKey] = annotationValue
-	}
-
-	// set queue name if app labels it
-	queueName := constants.ApplicationDefaultQueue
-	if an, ok := sparkApp.Labels[constants.LabelQueueName]; ok {
-		queueName = an
-	}
-
-	// retrieve the namespace info from the CRD
-	tags[constants.AppTagNamespace] = sparkApp.Namespace
-
-	return interfaces.ApplicationMetadata{
-		ApplicationID: sparkApp.Name,
-		QueueName:     queueName,
-		User:          "default",
-		Tags:          tags,
-	}
-}
-
-// list all existing applications
-func (os *Manager) ListApplications() (map[string]interfaces.ApplicationMetadata, error) {
-	lister := os.crdInformerFactory.Sparkoperator().V1beta2().SparkApplications().Lister()
-	sparkApps, err := lister.List(labels.Everything())
-	if err != nil {
-		return nil, err
-	}
-
-	existingApps := make(map[string]interfaces.ApplicationMetadata)
-	for _, sparkApp := range sparkApps {
-		existingApps[sparkApp.Name] = os.getAppMetadata(sparkApp)
-	}
-	return existingApps, nil
-}
-
-func (os *Manager) GetExistingAllocation(pod *v1.Pod) *si.Allocation {
-	if meta, valid := os.getTaskMetadata(pod); valid {
-		return &si.Allocation{
-			AllocationKey:    string(pod.UID),
-			AllocationTags:   nil,
-			UUID:             string(pod.UID),
-			ResourcePerAlloc: common.GetPodResource(pod),
-			QueueName:        utils.GetQueueNameFromPod(pod),
-			NodeID:           pod.Spec.NodeName,
-			ApplicationID:    meta.ApplicationID,
-			PartitionName:    constants.DefaultPartition,
-		}
-	}
-	return nil
-}
-
-// callbacks for SparkApplication CRD
-func (os *Manager) addApplication(obj interface{}) {
-	app := obj.(*v1beta2.SparkApplication)
-	log.Logger().Info("spark app added", zap.Any("SparkApplication", app))
-	os.amProtocol.AddApplication(&interfaces.AddApplicationRequest{
-		Metadata: os.getAppMetadata(app),
-	})
-}
-
 func (os *Manager) updateApplication(old, new interface{}) {
 	appOld := old.(*v1beta2.SparkApplication)
 	appNew := new.(*v1beta2.SparkApplication)
+	currState := appNew.Status.AppState.State
 	log.Logger().Debug("spark app updated",
 		zap.Any("old", appOld),
-		zap.Any("old", appNew))
+		zap.Any("new", appNew))
+	log.Logger().Debug("spark app state",
+		zap.String("current state", string(currState)))

Review comment:
       please combine the two log entries into one

##########
File path: pkg/cache/placeholder_manager.go
##########
@@ -46,10 +47,13 @@ type PlaceholderManager struct {
 }
 
 var placeholderMgr *PlaceholderManager
+var placeholderManagerLock sync.Mutex

Review comment:
       The init and usage of the manager has not changed. Having a lock like this seems weird because we should only ever the one that was created by the current test, or when the context is created in production.
   This probably also explains why you needed the nil check in the app code before.
   This is a test problem. By adding the manager usage to the fail application test we are re-using the manager from a previous test. We should not be doing that. Please check the way `TestTryReserve()` and `TestTryReservePostRestart()` initiate the manager.
   It looks like the test context does not create a manager and we might not clean up correctly either

##########
File path: pkg/cache/application.go
##########
@@ -511,11 +517,24 @@ func (app *Application) handleRejectApplicationEvent(event *fsm.Event) {
 		fmt.Sprintf("application %s is rejected by scheduler", app.applicationID)))
 }
 
+func placeholderCleanup(app *Application) {
+	placeholderManager := getPlaceholderManager()
+	if placeholderManager != nil {
+		placeholderManager.cleanUp(app)
+	}
+}
+

Review comment:
       `getPlaceholderManager()` cannot return a nil we can simplify this by directly calling this inside the go function while handling the events:
   ```
   go func() {
       getPlaceholderManager().cleanup(app)
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org