You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2020/10/06 04:26:06 UTC

[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #190: [YUNIKORN-332] Add events for reserved pods

wilfred-s commented on a change in pull request #190:
URL: https://github.com/apache/incubator-yunikorn-core/pull/190#discussion_r499996106



##########
File path: pkg/scheduler/scheduling_application.go
##########
@@ -624,13 +663,43 @@ func (sa *SchedulingApplication) tryNodes(ask *schedulingAllocationAsk, nodeIter
 		}
 		// return allocation proposal and mark it as a reservation
 		alloc := newSchedulingAllocation(ask, nodeToReserve.NodeID)
+		err := events.EmitReserveEvent(ask.AskProto.AllocationKey, ask.ApplicationID, nodeToReserve.NodeID)
+		if err != nil {
+			log.Logger().Debug("could not emit reserve events to shim",
+				zap.String("allocationKey", ask.AskProto.AllocationKey),
+				zap.String("appID", ask.ApplicationID),
+				zap.String("nodeID", nodeToReserve.NodeID),
+				zap.Error(err))
+		}
 		alloc.result = reserved
 		return alloc
 	}
 	// ask does not fit, skip to next ask
 	return nil
 }
 
+func EmitAllocatedReservedEvents(allocKey, appID, nodeID string) {
+	err := events.EmitAllocatedReservedEvent(allocKey, appID, nodeID)
+	if err != nil {
+		log.Logger().Warn("could not emit allocatedReserved events to shim",

Review comment:
       be consistent when using logging level: seems to change from WARN to DEBUG for similar errors.

##########
File path: pkg/scheduler/scheduling_application.go
##########
@@ -569,21 +596,33 @@ func (sa *SchedulingApplication) tryNodes(ask *schedulingAllocationAsk, nodeIter
 			// NOTE: this is a safeguard as reserved nodes should never be part of the iterator
 			// but we have no locking
 			if _, ok := sa.reservations[reservationKey(node, nil, ask)]; ok {
+				appID := sa.ApplicationInfo.ApplicationID
 				log.Logger().Debug("allocate found reserved ask during non reserved allocate",
-					zap.String("appID", sa.ApplicationInfo.ApplicationID),
+					zap.String("appID", appID),
 					zap.String("nodeID", node.NodeID),
 					zap.String("allocationKey", allocKey))
+				err := events.EmitAllocatedReservedEvent(allocKey, appID, node.NodeID)
+				if err != nil {
+					log.Logger().Debug("could not emit allocatedReserved events to shim",
+						zap.String("allocationKey", allocKey),
+						zap.String("appID", appID),
+						zap.String("nodeID", node.NodeID),
+						zap.Error(err))
+				}
 				alloc.result = allocatedReserved
 				return alloc
 			}
 			// we could also have a different node reserved for this ask if it has pick one of
 			// the reserved nodes to unreserve (first one in the list)
 			if len(reservedAsks) > 0 {
 				nodeID := strings.TrimSuffix(reservedAsks[0], "|"+allocKey)
+				appID := sa.ApplicationInfo.ApplicationID
 				log.Logger().Debug("allocate picking reserved ask during non reserved allocate",
-					zap.String("appID", sa.ApplicationInfo.ApplicationID),
+					zap.String("appID", appID),
 					zap.String("nodeID", nodeID),
 					zap.String("allocationKey", allocKey))
+				EmitUnReserveEventForNode(ask.AskProto.AllocationKey, ask.ApplicationID, nodeID)
+				EmitAllocatedReservedEvents(ask.AskProto.AllocationKey, ask.ApplicationID, node.NodeID)

Review comment:
       You already have `appID` stored locally why not reuse it?

##########
File path: pkg/events/events.go
##########
@@ -58,3 +58,73 @@ func CreateNodeEventRecord(objectID, reason, message string) (*si.EventRecord, e
 func CreateQueueEventRecord(objectID, groupID, reason, message string) (*si.EventRecord, error) {
 	return createEventRecord(si.EventRecord_QUEUE, objectID, groupID, reason, message)
 }
+
+// emitting event to the shim for a reservation
+// the event will be sent to the node, the application and the allocation involved in the reservation
+func EmitReserveEvent(allocKey, appID, nodeID string) error {
+	return emitRequestAppAndNodeEvents(allocKey, appID, nodeID,
+		"Allocation %s from application %s reserved node %s", "AppReservedNode")
+}
+
+// emit an unreserve event for the allocation, the application and the node as well
+func EmitUnReserveEvent(allocKey, appID, nodeID string) error {
+	return emitRequestAppAndNodeEvents(allocKey, appID, nodeID,
+		"Allocation %s from application %s unreserved from node %s", "AppUnreservedNode")
+}
+
+// emit an unreserve event only to the node
+func EmitUnreserveEventForNode(allocKey, appID, nodeID string) error {
+	eventCache := GetEventCache()
+	if eventCache == nil {
+		// The event cache is disabled.
+		// It's not an error, let's return.
+		return nil
+	}
+	msg := fmt.Sprintf("Ask %s from application %s allocated to node %s", allocKey, appID, nodeID)
+
+	event, err := CreateNodeEventRecord(nodeID, "AppAllocatedReservedNode", msg)
+	if err != nil {
+		return err
+	}
+	eventCache.AddEvent(event)
+
+	return nil
+}
+
+// emit an allocation event to the node through the shim
+func EmitAllocatedReservedEvent(allocKey, appID, nodeID string) error {
+	return emitRequestAppAndNodeEvents(allocKey, appID, nodeID,
+		"Ask %s from application %s allocated to node %s", "AppAllocatedReservedNode")
+}
+
+func emitRequestAppAndNodeEvents(allocKey, appID, nodeID, msgFmt, reason string) error {
+	eventCache := GetEventCache()
+	if eventCache == nil {
+		// The event cache is disabled.
+		// It's not an error, let's return.
+		return nil
+	}
+	msg := fmt.Sprintf(msgFmt, allocKey, appID, nodeID)
+
+	event, err := CreateNodeEventRecord(nodeID, reason, msg)
+	if err != nil {
+		return err

Review comment:
       Generic question: if this event create fails does that also mean all next events should or must fail. Could we have a case that just the nodeID is an issue and should we send out the rest of the events in that case?
   I think we need to be as smart as possible and sent as much as we can (i.e. fail only what really must fail)

##########
File path: pkg/events/events_test.go
##########
@@ -68,3 +71,119 @@ func TestEmptyFields(t *testing.T) {
 	_, err = createEventRecord(si.EventRecord_QUEUE, "obj", "group", "", "message")
 	assert.Assert(t, err != nil, "the EventRecord should not be created with empty reason")
 }
+
+func TestEmitReserveEventWithoutEventCache(t *testing.T) {
+	cache := GetEventCache()
+	assert.Assert(t, cache == nil, "cache should not be initialized")
+
+	allocKey := "test-alloc-1"
+	appID := "app-1"
+	nodeID := "node-1"
+	err := EmitReserveEvent(allocKey, appID, nodeID)
+	assert.NilError(t, err, "emitting event without event cache should succeed")
+}
+
+func TestEmitReserveEvent(t *testing.T) {
+	CreateAndSetEventCache()
+	defer ResetCache()
+	cache := GetEventCache()
+	cache.StartService()
+
+	allocKey := "test-alloc-2"
+	appID := "app-2"
+	nodeID := "node-2"
+	err := EmitReserveEvent(allocKey, appID, nodeID)
+	assert.NilError(t, err, "expected EmitReserveEvent to run without errors")
+
+	assertEmitRequestAppAndNodeEvents(t, allocKey, appID, nodeID, "AppReservedNode")

Review comment:
       Test coverage for the error cases could be easily extended by passing in empty strings.
   That would cover the codecov remarks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org