You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/11 05:37:20 UTC

[GitHub] [beam] pabloem commented on a change in pull request #15926: [BEAM-13110][Playground] Playground pipeline cancelation

pabloem commented on a change in pull request #15926:
URL: https://github.com/apache/beam/pull/15926#discussion_r747204260



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.Life
 		}
 	}(successChannel, errorChannel, dataChannel)
 
+	processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
 	select {
-	case <-ctxWithTimeout.Done():
-		finishByContext(ctxWithTimeout, pipelineId, cacheService)
-		return
-	case ok := <-successChannel:
-		data := <-dataChannel
+	case <-ctx.Done():
+		finishByContext(ctx, pipelineId, cacheService)
+		return false
+	case <-cancelChan:
+		processCancel(ctx, cacheService, pipelineId)
+		return false
+	case ok := <-successChan:
+		var data []byte = nil
+		if dataChan != nil {
+			temp := <-dataChan
+			data = temp.([]byte)
+		}
 		if !ok {
 			err := <-errorChannel
-			processError(ctxWithTimeout, err.(error), data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-			return
+			processError(ctx, err, data, pipelineId, cacheService, errorCaseStatus)
+			return false
 		}
-		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_FINISHED)
+		processSuccess(ctx, data, pipelineId, cacheService, successCaseStatus)
 	}
+	return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
 	logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+	// set to cache pipelineId: cache.Canceled: false to stop cancelCheck() method
+	setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
 	setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan bool, cacheService cache.Cache) {
+	for {
+		cancel, err := cacheService.GetValue(ctx, pipelineId, cache.Canceled)

Review comment:
       should we have a timer here so that we won't overwhelm the cache with requests?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -272,6 +282,9 @@ func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.Life
 	errorChannel := make(chan error, 1)
 	dataChannel := make(chan interface{}, 1)
 	successChannel := make(chan bool, 1)
+	cancelChan := make(chan bool, 1)

Review comment:
       ```suggestion
   	cancelChannel := make(chan bool, 1)
   ```
   
   Let's keep consistent naming for channel variables

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.Life
 		}
 	}(successChannel, errorChannel, dataChannel)
 
+	processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
 	select {
-	case <-ctxWithTimeout.Done():
-		finishByContext(ctxWithTimeout, pipelineId, cacheService)
-		return
-	case ok := <-successChannel:
-		data := <-dataChannel
+	case <-ctx.Done():
+		finishByContext(ctx, pipelineId, cacheService)
+		return false
+	case <-cancelChan:
+		processCancel(ctx, cacheService, pipelineId)
+		return false
+	case ok := <-successChan:
+		var data []byte = nil
+		if dataChan != nil {
+			temp := <-dataChan
+			data = temp.([]byte)
+		}
 		if !ok {
 			err := <-errorChannel
-			processError(ctxWithTimeout, err.(error), data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-			return
+			processError(ctx, err, data, pipelineId, cacheService, errorCaseStatus)
+			return false
 		}
-		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_FINISHED)
+		processSuccess(ctx, data, pipelineId, cacheService, successCaseStatus)
 	}
+	return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
 	logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+	// set to cache pipelineId: cache.Canceled: false to stop cancelCheck() method
+	setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
 	setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan bool, cacheService cache.Cache) {
+	for {
+		cancel, err := cacheService.GetValue(ctx, pipelineId, cache.Canceled)

Review comment:
       seems like Go has a ticker utility that could be useful here: https://gobyexample.com/tickers

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -447,13 +469,27 @@ func processSuccess(ctx context.Context, output []byte, pipelineId uuid.UUID, ca
 
 		setToCache(ctx, cacheService, pipelineId, cache.RunOutput, string(output))
 
+		// set to cache pipelineId: cache.Canceled: false to stop cancelCheck() method
+		setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
+		// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_FINISHED
 		setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
 	}
 }
 
+// processCancel process case when code processing was canceled
+func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId uuid.UUID) {
+	logger.Infof("%s: was canceled\n", pipelineId)
+
+	// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_CANCELED
+	setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)

Review comment:
       Should we get a `CancelFunc` for the context? If we receive a cancel request, we need to kill the running process (compilation or the pipeline or anything), so we need to cancel the context, right?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.Life
 		}
 	}(successChannel, errorChannel, dataChannel)
 
+	processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {

Review comment:
       Let's keep consistent naming
   
   ```suggestion
   func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChan, successChannel chan bool, dataChannel chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
   ```

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.Life
 		}
 	}(successChannel, errorChannel, dataChannel)
 
+	processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
 	select {
-	case <-ctxWithTimeout.Done():
-		finishByContext(ctxWithTimeout, pipelineId, cacheService)
-		return
-	case ok := <-successChannel:
-		data := <-dataChannel
+	case <-ctx.Done():
+		finishByContext(ctx, pipelineId, cacheService)

Review comment:
       Let's rename this function to `finishByTimeout` since that's the only case when this is triggered (you've documented it on the function, which is great - but let's name it as well)

##########
File path: playground/backend/cmd/server/controller_test.go
##########
@@ -221,7 +224,9 @@ func TestPlaygroundController_CheckStatus(t *testing.T) {
 			wantErr:    true,
 		},
 		{
-			name: "all success",
+			// Test case with calling CheckStatus method with pipelineId which contains status.
+			// As a result, want to receive an expected status.

Review comment:
       thanks for adding the comments. They're helpful : )

##########
File path: playground/backend/cmd/server/controller_test.go
##########
@@ -448,6 +453,58 @@ func TestPlaygroundController_GetRunError(t *testing.T) {
 	}
 }
 
+func TestPlaygroundController_Cancel(t *testing.T) {

Review comment:
       The code in processCode is becoming more complex (by necessity) - I think it would make sense to make sure we're not leaking goroutines. Do you think we could add checks for that?
   
   I found this library that seems to check this properly: https://github.com/uber-go/goleak
   
   (or it may not fit our use case - but worth considering)

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.Life
 		}
 	}(successChannel, errorChannel, dataChannel)
 
+	processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
 	select {
-	case <-ctxWithTimeout.Done():
-		finishByContext(ctxWithTimeout, pipelineId, cacheService)
-		return
-	case ok := <-successChannel:
-		data := <-dataChannel
+	case <-ctx.Done():
+		finishByContext(ctx, pipelineId, cacheService)
+		return false
+	case <-cancelChan:
+		processCancel(ctx, cacheService, pipelineId)
+		return false
+	case ok := <-successChan:
+		var data []byte = nil
+		if dataChan != nil {
+			temp := <-dataChan
+			data = temp.([]byte)
+		}
 		if !ok {
 			err := <-errorChannel
-			processError(ctxWithTimeout, err.(error), data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-			return
+			processError(ctx, err, data, pipelineId, cacheService, errorCaseStatus)
+			return false
 		}
-		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_FINISHED)
+		processSuccess(ctx, data, pipelineId, cacheService, successCaseStatus)
 	}
+	return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
 	logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+	// set to cache pipelineId: cache.Canceled: false to stop cancelCheck() method
+	setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
 	setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan bool, cacheService cache.Cache) {
+	for {
+		cancel, err := cacheService.GetValue(ctx, pipelineId, cache.Canceled)
+		if err != nil {
+			continue

Review comment:
       I also wonder - should we handle the case where `pipelineId` does not exist in the cache?
   
   I think you've correctly set `canceled=false` in the cache for every possible pipeline finalization, so this should not be necessary - but if we missed it for any reason, we could leak goroutines, so it may be good to let the `cancelCheck` exit if the `pipelineId` is no longer in the cache. 
   
   what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org