You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/04/12 15:11:48 UTC

[beam] branch master updated: [Playground] Simplify contexts in code_processing/Process() (#25747)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce212ab8cff [Playground] Simplify contexts in code_processing/Process() (#25747)
ce212ab8cff is described below

commit ce212ab8cffd0826c343c744fe841c8424df04a1
Author: Timur Sultanov <ti...@akvelon.com>
AuthorDate: Wed Apr 12 19:11:39 2023 +0400

    [Playground] Simplify contexts in code_processing/Process() (#25747)
    
    * Simplify contexts in code_processing
    
    * Remove "cancelChannel", use context's cancellation function to cancel execution
    
    * Return error from validationStep()
    
    * Return error from prepareStep()
    
    * Return error from compileStep()
    
    * Return error from runStep()
    
    * Fix error handling
---
 playground/backend/cmd/server/controller.go        |  14 +-
 .../internal/code_processing/code_processing.go    | 237 ++++++++++++---------
 .../code_processing/code_processing_test.go        |  79 +++----
 playground/backend/internal/utils/cache_utils.go   |   7 +-
 .../backend/internal/utils/cache_utils_test.go     |   2 +-
 5 files changed, 193 insertions(+), 146 deletions(-)

diff --git a/playground/backend/cmd/server/controller.go b/playground/backend/cmd/server/controller.go
index 778cbdfb4c4..4e379be2595 100644
--- a/playground/backend/cmd/server/controller.go
+++ b/playground/backend/cmd/server/controller.go
@@ -123,19 +123,19 @@ func (controller *playgroundController) RunCode(ctx context.Context, info *pb.Ru
 		return nil, cerrors.InternalError("Error during preparing", "Error during setup file system for the code processing: %s", err.Error())
 	}
 
-	if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATING); err != nil {
+	if err = utils.SetToCache(controller.cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATING); err != nil {
 		code_processing.DeleteResources(pipelineId, lc)
 		return nil, cerrors.InternalError("Error during preparing", "Error during saving status of the code processing")
 	}
-	if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.RunOutputIndex, 0); err != nil {
+	if err = utils.SetToCache(controller.cacheService, pipelineId, cache.RunOutputIndex, 0); err != nil {
 		code_processing.DeleteResources(pipelineId, lc)
 		return nil, cerrors.InternalError("Error during preparing", "Error during saving initial run output")
 	}
-	if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.LogsIndex, 0); err != nil {
+	if err = utils.SetToCache(controller.cacheService, pipelineId, cache.LogsIndex, 0); err != nil {
 		code_processing.DeleteResources(pipelineId, lc)
 		return nil, cerrors.InternalError("Error during preparing", "Error during saving value for the logs output")
 	}
-	if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.Canceled, false); err != nil {
+	if err = utils.SetToCache(controller.cacheService, pipelineId, cache.Canceled, false); err != nil {
 		code_processing.DeleteResources(pipelineId, lc)
 		return nil, cerrors.InternalError("Error during preparing", "Error during saving initial cancel flag")
 	}
@@ -185,7 +185,7 @@ func (controller *playgroundController) GetRunOutput(ctx context.Context, info *
 	newRunOutput := ""
 	if len(runOutput) > lastIndex {
 		newRunOutput = runOutput[lastIndex:]
-		if err := utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.RunOutputIndex, lastIndex+len(newRunOutput)); err != nil {
+		if err := utils.SetToCache(controller.cacheService, pipelineId, cache.RunOutputIndex, lastIndex+len(newRunOutput)); err != nil {
 			return nil, cerrors.InternalError(errorMessage, "Error during saving pagination value")
 		}
 	}
@@ -215,7 +215,7 @@ func (controller *playgroundController) GetLogs(ctx context.Context, info *pb.Ge
 	newLogs := ""
 	if len(logs) > lastIndex {
 		newLogs = logs[lastIndex:]
-		if err := utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.LogsIndex, lastIndex+len(newLogs)); err != nil {
+		if err := utils.SetToCache(controller.cacheService, pipelineId, cache.LogsIndex, lastIndex+len(newLogs)); err != nil {
 			return nil, cerrors.InternalError(errorMessage, "Error during saving pagination value")
 		}
 	}
@@ -308,7 +308,7 @@ func (controller *playgroundController) Cancel(ctx context.Context, info *pb.Can
 		logger.Errorf("%s: Cancel(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
 		return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
 	}
-	if err := utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.Canceled, true); err != nil {
+	if err := utils.SetToCache(controller.cacheService, pipelineId, cache.Canceled, true); err != nil {
 		return nil, cerrors.InternalError(errorMessage, "Error during saving cancel flag value")
 	}
 	return &pb.CancelResponse{}, nil
diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go
index 32cd9c38027..a249592e145 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -62,19 +62,19 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		DeleteResources(pipelineId, lc)
 	}(lc)
 
-	cancelChannel := make(chan bool, 1)
-
 	var validationResults sync.Map
 
-	go cancelCheck(pipelineLifeCycleCtx, pipelineId, cancelChannel, cacheService)
+	go cancelCheck(pipelineLifeCycleCtx, pipelineId, finishCtxFunc, cacheService)
 
-	executor := validateStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, pipelineLifeCycleCtx, &validationResults, cancelChannel)
-	if executor == nil {
+	err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults)
+	if err != nil {
+		logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
 		return
 	}
 
-	executor = prepareStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, pipelineLifeCycleCtx, &validationResults, cancelChannel, lc.GetPreparerParameters())
-	if executor == nil {
+	err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
+	if err != nil {
+		logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
 		return
 	}
 
@@ -82,16 +82,20 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 	validateIsUnitTest, _ := validationResults.Load(validators.UnitTestValidatorName)
 	isUnitTest := validateIsUnitTest.(bool)
 
-	executor = compileStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest, pipelineLifeCycleCtx, cancelChannel)
-	if executor == nil {
+	err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest)
+	if err != nil {
+		logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
 		return
 	}
 
 	// Run/RunTest
-	runStep(ctx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions, pipelineLifeCycleCtx, cancelChannel)
+	err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions)
+	if err != nil {
+		logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
+	}
 }
 
-func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, isUnitTest bool, sdkEnv *environment.BeamEnvs, pipelineOptions string, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) {
+func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, isUnitTest bool, sdkEnv *environment.BeamEnvs, pipelineOptions string) error {
 	errorChannel, successChannel := createStatusChannels()
 	stopReadLogsChannel := make(chan bool, 1)
 	finishReadLogsChannel := make(chan bool, 1)
@@ -99,22 +103,24 @@ func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeC
 	var executorBuilder *executors.ExecutorBuilder
 	err := error(nil)
 	if isUnitTest {
-		executorBuilder, err = builder.TestRunner(pipelineLifeCycleCtx, paths, sdkEnv)
+		executorBuilder, err = builder.TestRunner(ctx, paths, sdkEnv)
 	} else {
-		executorBuilder, err = builder.Runner(pipelineLifeCycleCtx, paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
+		executorBuilder, err = builder.Runner(ctx, paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
 	}
 	if err != nil {
-		_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
-		return
+		if processingErr := processSetupError(err, pipelineId, cacheService); processingErr != nil {
+			return processingErr
+		}
+		return err
 	}
 
 	executor := executorBuilder.Build()
 	logger.Infof("%s: Run()/Test() ...\n", pipelineId)
-	runCmd := getExecuteCmd(isUnitTest, &executor, pipelineLifeCycleCtx)
+	runCmd := getExecuteCmd(isUnitTest, &executor, ctx)
 	var runError bytes.Buffer
-	runOutput := streaming.RunOutputWriter{Ctx: pipelineLifeCycleCtx, CacheService: cacheService, PipelineId: pipelineId}
-	go readLogFile(pipelineLifeCycleCtx, ctx, cacheService, paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)
-	go readGraphFile(pipelineLifeCycleCtx, ctx, cacheService, paths.AbsoluteGraphFilePath, pipelineId)
+	runOutput := streaming.RunOutputWriter{Ctx: ctx, CacheService: cacheService, PipelineId: pipelineId}
+	go readLogFile(ctx, cacheService, paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)
+	go readGraphFile(ctx, cacheService, paths.AbsoluteGraphFilePath, pipelineId)
 
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
 		// For go SDK all logs are placed to stdErr.
@@ -133,9 +139,9 @@ func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeC
 	}
 
 	// Start of the monitoring of background tasks (run step/cancellation/timeout)
-	ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+	ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
 	if err != nil {
-		return
+		return err
 	}
 	if !ok {
 		// If unit test has some error then error output is placed as RunOutput
@@ -155,20 +161,26 @@ func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeC
 			}
 			runError.Write(errData)
 		}
-		_ = processRunError(pipelineLifeCycleCtx, errorChannel, runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
-		return
+		processingErr := processRunError(errorChannel, runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
+		if processingErr != nil {
+			return processingErr
+		}
+		return fmt.Errorf("run error: %s", runError.String())
 	}
 	// Run step is finished and code is executed
-	_ = processRunSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
+	err = processRunSuccess(pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
-func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, isUnitTest bool, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) *executors.Executor {
+func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, isUnitTest bool) error {
 	errorChannel, successChannel := createStatusChannels()
-	var executor = executors.Executor{}
 	// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_SCIO || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
-		if err := processCompileSuccess(pipelineLifeCycleCtx, []byte(""), pipelineId, cacheService); err != nil {
-			return nil
+		if err := processCompileSuccess([]byte(""), pipelineId, cacheService); err != nil {
+			return err
 		}
 	} else { // in case of Java, Go (not unit test), Scala - need compile step
 		executorBuilder, err := builder.Compiler(paths, sdkEnv)
@@ -178,34 +190,39 @@ func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
 		}
 		executor := executorBuilder.Build()
 		logger.Infof("%s: Compile() ...\n", pipelineId)
-		compileCmd := executor.Compile(pipelineLifeCycleCtx)
+		compileCmd := executor.Compile(ctx)
 		var compileError bytes.Buffer
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
 		// Start of the monitoring of background tasks (compile step/cancellation/timeout)
-		ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+		ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
 		if err != nil {
-			return nil
+			return err
 		}
 		if !ok { // Compile step is finished, but code couldn't be compiled (some typos for example)
 			err := <-errorChannel
-			_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
-			return nil
+			processingErr := processErrorWithSavingOutput(err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
+			if processingErr != nil {
+				return processingErr
+			}
+			return err
 		} // Compile step is finished and code is compiled
-		if err := processCompileSuccess(pipelineLifeCycleCtx, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
-			return nil
+		if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil {
+			return err
 		}
 	}
-	return &executor
+	return nil
 }
 
-func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, pipelineLifeCycleCtx context.Context, validationResults *sync.Map, cancelChannel chan bool, prepareParams map[string]string) *executors.Executor {
+func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, validationResults *sync.Map, prepareParams map[string]string) error {
 	errorChannel, successChannel := createStatusChannels()
 	executorBuilder, err := builder.Preparer(paths, sdkEnv, validationResults, prepareParams)
 	if err != nil {
-		_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
-		return nil
+		if processingErr := processSetupError(err, pipelineId, cacheService); processingErr != nil {
+			return processingErr
+		}
+		return err
 	}
 	executor := executorBuilder.Build()
 	logger.Infof("%s: Prepare() ...\n", pipelineId)
@@ -213,29 +230,34 @@ func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
 	go prepareFunc(successChannel, errorChannel, validationResults)
 
 	// Start of the monitoring of background tasks (prepare function/cancellation/timeout)
-	ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+	ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
 	if err != nil {
-		return nil
+		return err
 	}
 	if !ok {
 		err := <-errorChannel
 		// Prepare step is finished, but code couldn't be prepared (some error during prepare step)
-		_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
-		return nil
+		processingErr := processErrorWithSavingOutput(err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
+		if processingErr != nil {
+			return processingErr
+		}
+		return err
 	}
 	// Prepare step is finished and code is prepared
-	if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
-		return nil
+	if err := processSuccess(pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
+		return err
 	}
-	return &executor
+	return nil
 }
 
-func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, pipelineLifeCycleCtx context.Context, validationResults *sync.Map, cancelChannel chan bool) *executors.Executor {
+func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, validationResults *sync.Map) error {
 	errorChannel, successChannel := createStatusChannels()
 	executorBuilder, err := builder.Validator(paths, sdkEnv)
 	if err != nil {
-		_ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
-		return nil
+		if processingError := processSetupError(err, pipelineId, cacheService); processingError != nil {
+			return processingError
+		}
+		return err
 	}
 	executor := executorBuilder.Build()
 	logger.Infof("%s: Validate() ...\n", pipelineId)
@@ -243,22 +265,25 @@ func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.
 	go validateFunc(successChannel, errorChannel, validationResults)
 
 	// Start of the monitoring of background tasks (validate function/cancellation/timeout)
-	ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+	ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
 	if err != nil {
-		return nil
+		return err
 	}
 	if !ok {
 		err := <-errorChannel
 		// Validate step is finished, but code isn't valid
-		_ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
-		return nil
+		processingErr := processErrorWithSavingOutput(err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
+		if processingErr != nil {
+			return processingErr
+		}
+		return err
 	}
 
 	// Validate step is finished and code is valid
-	if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
-		return nil
+	if err := processSuccess(pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
+		return err
 	}
-	return &executor
+	return err
 }
 
 func createStatusChannels() (chan error, chan bool) {
@@ -278,9 +303,10 @@ func getExecuteCmd(isUnitTest bool, executor *executors.Executor, ctxWithTimeout
 }
 
 // processSetupError processes errors during the setting up an executor builder
-func processSetupError(err error, pipelineId uuid.UUID, cacheService cache.Cache, ctxWithTimeout context.Context) error {
+func processSetupError(err error, pipelineId uuid.UUID, cacheService cache.Cache) error {
 	logger.Errorf("%s: error during setup builder: %s\n", pipelineId, err.Error())
-	if err = utils.SetToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR); err != nil {
+	if err = utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR); err != nil {
+		logger.Errorf("%s: error during saving error message: %s", pipelineId, err)
 		return err
 	}
 	return nil
@@ -379,14 +405,24 @@ func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError io.Writer, su
 // If cmd operation (Validate/Prepare/Compile/Run/RunTest) finishes successfully but with some error
 //
 //	during step processing - returns false.
-func reconcileBackgroundTask(pipelineLifeCycleCtx, backgroundCtx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool) (bool, error) {
+func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, successChannel chan bool) (bool, error) {
 	select {
 	case <-pipelineLifeCycleCtx.Done():
-		_ = finishByTimeout(backgroundCtx, pipelineId, cacheService)
-		return false, fmt.Errorf("%s: context was done", pipelineId)
-	case <-cancelChannel:
-		_ = processCancel(pipelineLifeCycleCtx, cacheService, pipelineId)
-		return false, fmt.Errorf("%s: code processing was canceled", pipelineId)
+		contextErr := pipelineLifeCycleCtx.Err()
+		switch contextErr {
+		case context.DeadlineExceeded:
+			if err := finishByTimeout(pipelineId, cacheService); err != nil {
+				return false, fmt.Errorf("error during context timeout processing: %s", err.Error())
+			}
+			return false, fmt.Errorf("code processing context timeout")
+		case context.Canceled:
+			if err := processCancel(cacheService, pipelineId); err != nil {
+				return false, fmt.Errorf("error during cancellation processing: %s", err.Error())
+			}
+			return false, fmt.Errorf("code processing was canceled")
+		default:
+			return false, fmt.Errorf("code processing cancelled: %s", contextErr.Error())
+		}
 	case ok := <-successChannel:
 		return ok, nil
 	}
@@ -396,7 +432,7 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx, backgroundCtx context.Context
 // If cancel flag doesn't exist in cache continue working.
 // If context is done it means that the code processing was finished (successfully/with error/timeout). Return.
 // If cancel flag exists, and it is true it means that the code processing was canceled. Set true to cancelChannel and return.
-func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan bool, cacheService cache.Cache) {
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelFunc context.CancelFunc, cacheService cache.Cache) {
 	ticker := time.NewTicker(pauseDuration)
 	for {
 		select {
@@ -409,7 +445,7 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
 				logger.Errorf("%s: Error during getting value from the cache: %s", pipelineId, err.Error())
 			}
 			if cancel.(bool) {
-				cancelChannel <- true
+				cancelFunc()
 				return
 			}
 		}
@@ -420,7 +456,7 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
 // If context is done it means that the code processing was finished (successfully/with error/timeout).
 // Write graph to the cache if this in the file.
 // In other case each pauseDuration checks that graph file exists or not and try to save it to the cache.
-func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheService cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
+func readGraphFile(pipelineLifeCycleCtx context.Context, cacheService cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
 	ticker := time.NewTicker(pauseDuration)
 	for {
 		select {
@@ -432,7 +468,7 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer
 				if err != nil {
 					logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error())
 				}
-				_ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph))
+				_ = utils.SetToCache(cacheService, pipelineId, cache.Graph, string(graph))
 			}
 		// in case of timeout or cancel
 		case <-pipelineLifeCycleCtx.Done():
@@ -442,7 +478,7 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer
 				if err != nil {
 					logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error())
 				}
-				_ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph))
+				_ = utils.SetToCache(cacheService, pipelineId, cache.Graph, string(graph))
 			}
 			return
 		}
@@ -457,29 +493,29 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer
 //	to the cache and set value to the finishReadLogChannel channel to unblock the code processing.
 //
 // In other case each pauseDuration write to cache logs of the code processing.
-func readLogFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, finishReadLogChannel chan bool) {
+func readLogFile(pipelineLifeCycleCtx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, finishReadLogChannel chan bool) {
 	ticker := time.NewTicker(pauseDuration)
 	for {
 		select {
 		// in case of timeout or cancel
 		case <-pipelineLifeCycleCtx.Done():
-			_ = finishReadLogFile(backgroundCtx, ticker, cacheService, logFilePath, pipelineId)
+			_ = finishReadLogFile(ticker, cacheService, logFilePath, pipelineId)
 			return
 		// in case of pipeline finish successfully or has error on the run step
 		case <-stopReadLogsChannel:
-			_ = finishReadLogFile(pipelineLifeCycleCtx, ticker, cacheService, logFilePath, pipelineId)
+			_ = finishReadLogFile(ticker, cacheService, logFilePath, pipelineId)
 			finishReadLogChannel <- true
 			return
 		case <-ticker.C:
-			_ = writeLogsToCache(pipelineLifeCycleCtx, cacheService, logFilePath, pipelineId)
+			_ = writeLogsToCache(cacheService, logFilePath, pipelineId)
 		}
 	}
 }
 
 // finishReadLogFile is used to read logs file for the last time
-func finishReadLogFile(ctx context.Context, ticker *time.Ticker, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
+func finishReadLogFile(ticker *time.Ticker, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
 	ticker.Stop()
-	return writeLogsToCache(ctx, cacheService, logFilePath, pipelineId)
+	return writeLogsToCache(cacheService, logFilePath, pipelineId)
 }
 
 // writeLogsToCache write all logs from the log file to the cache.
@@ -490,7 +526,7 @@ func finishReadLogFile(ctx context.Context, ticker *time.Ticker, cacheService ca
 //
 // If log file exists, read all from the log file and keep it to the cache using cache.Logs subKey.
 // If some error occurs, log the error and return the error.
-func writeLogsToCache(ctx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
+func writeLogsToCache(cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
 	if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
 		return nil
 	}
@@ -499,7 +535,7 @@ func writeLogsToCache(ctx context.Context, cacheService cache.Cache, logFilePath
 		logger.Errorf("%s: writeLogsToCache(): error during read from logs file: %s", pipelineId, err.Error())
 		return err
 	}
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Logs, string(logs))
+	return utils.SetToCache(cacheService, pipelineId, cache.Logs, string(logs))
 }
 
 // DeleteResources removes all prepared resources for received LifeCycle
@@ -515,22 +551,27 @@ func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
 }
 
 // finishByTimeout is used in case of runCode method finished by timeout
-func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) error {
+func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
 	logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
 
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
+	return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
 // processErrorWithSavingOutput processes error with saving to cache received error output.
-func processErrorWithSavingOutput(ctx context.Context, err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
+func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
 	logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
 
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
+		logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error())
 		return err
 	}
 
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+	if err = utils.SetToCache(cacheService, pipelineId, cache.Status, newStatus); err != nil {
+		logger.Errorf("%s: failed to save status to cache: %s", pipelineId, err.Error())
+		return err
+	}
+	return nil
 }
 
 // processRunError processes error received during processing run step.
@@ -538,51 +579,51 @@ func processErrorWithSavingOutput(ctx context.Context, err error, errorOutput []
 //
 //	After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
 //	sets corresponding status to the cache.
-func processRunError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+func processRunError(errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
 	err := <-errorChannel
 	logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, fmt.Sprintf("error: %s\noutput: %s", err.Error(), string(errorOutput))); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, cache.RunError, fmt.Sprintf("error: %s\noutput: %s", err.Error(), string(errorOutput))); err != nil {
 		return err
 	}
 
 	stopReadLogsChannel <- true
 	<-finishReadLogsChannel
 
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
+	return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
 }
 
 // processSuccess processes case after successful process validation or preparation steps.
 // This method sets corresponding status to the cache.
-func processSuccess(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, successTitle string, newStatus pb.Status) error {
+func processSuccess(pipelineId uuid.UUID, cacheService cache.Cache, successTitle string, newStatus pb.Status) error {
 	logger.Infof("%s: %s(): finish\n", pipelineId, successTitle)
 
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+	return utils.SetToCache(cacheService, pipelineId, cache.Status, newStatus)
 }
 
 // processCompileSuccess processes case after successful compile step.
 // This method sets output of the compile step, sets empty string as output of the run step and
 //
 //	sets corresponding status to the cache.
-func processCompileSuccess(ctx context.Context, output []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
+func processCompileSuccess(output []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
 	logger.Infof("%s: Compile() finish\n", pipelineId)
 
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.CompileOutput, string(output)); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, cache.CompileOutput, string(output)); err != nil {
 		return err
 	}
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunOutput, ""); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, cache.RunOutput, ""); err != nil {
 		return err
 	}
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, ""); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, cache.RunError, ""); err != nil {
 		return err
 	}
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.Logs, ""); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, cache.Logs, ""); err != nil {
 		return err
 	}
-	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.Graph, ""); err != nil {
+	if err := utils.SetToCache(cacheService, pipelineId, cache.Graph, ""); err != nil {
 		return err
 	}
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
+	return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
 }
 
 // processRunSuccess processes case after successful run step.
@@ -590,19 +631,19 @@ func processCompileSuccess(ctx context.Context, output []byte, pipelineId uuid.U
 //
 //	After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
 //	sets corresponding status to the cache.
-func processRunSuccess(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+func processRunSuccess(pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
 	logger.Infof("%s: Run() finish\n", pipelineId)
 
 	stopReadLogsChannel <- true
 	<-finishReadLogsChannel
 
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
+	return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
 }
 
 // processCancel process case when code processing was canceled
-func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId uuid.UUID) error {
+func processCancel(cacheService cache.Cache, pipelineId uuid.UUID) error {
 	logger.Infof("%s: was canceled\n", pipelineId)
 
 	// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_CANCELED
-	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
+	return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
 }
diff --git a/playground/backend/internal/code_processing/code_processing_test.go b/playground/backend/internal/code_processing/code_processing_test.go
index b9f4f7131c4..b650e012aef 100644
--- a/playground/backend/internal/code_processing/code_processing_test.go
+++ b/playground/backend/internal/code_processing/code_processing_test.go
@@ -97,8 +97,14 @@ func setup() {
 	if err != nil {
 		panic(err)
 	}
-	os.Setenv("BEAM_SDK", pb.Sdk_SDK_JAVA.String())
-	os.Setenv("APP_WORK_DIR", path)
+	err = os.Setenv("BEAM_SDK", pb.Sdk_SDK_JAVA.String())
+	if err != nil {
+		panic(err)
+	}
+	err = os.Setenv("APP_WORK_DIR", path)
+	if err != nil {
+		panic(err)
+	}
 
 	cacheService = local.New(context.Background())
 }
@@ -296,14 +302,14 @@ func Test_Process(t *testing.T) {
 			if tt.createExecFile {
 				_ = lc.CreateSourceCodeFiles(sources)
 			}
-			if err = utils.SetToCache(tt.args.ctx, cacheService, tt.args.pipelineId, cache.Canceled, false); err != nil {
+			if err = utils.SetToCache(cacheService, tt.args.pipelineId, cache.Canceled, false); err != nil {
 				t.Fatal("error during set cancel flag to cache")
 			}
 			if tt.cancelFunc {
 				go func(ctx context.Context, pipelineId uuid.UUID) {
 					// to imitate behavior of cancellation
 					time.Sleep(5 * time.Second)
-					cacheService.SetValue(ctx, pipelineId, cache.Canceled, true)
+					_ = cacheService.SetValue(ctx, pipelineId, cache.Canceled, true)
 				}(tt.args.ctx, tt.args.pipelineId)
 			}
 			Process(tt.args.ctx, cacheService, lc, tt.args.pipelineId, tt.args.appEnv, tt.args.sdkEnv, tt.args.pipelineOptions)
@@ -675,9 +681,18 @@ func setupSDK(sdk pb.Sdk) {
 		panic(err)
 	}
 
-	os.Setenv("BEAM_SDK", sdk.String())
-	os.Setenv("APP_WORK_DIR", "")
-	os.Setenv("PREPARED_MOD_DIR", "")
+	err = os.Setenv("BEAM_SDK", sdk.String())
+	if err != nil {
+		panic(err)
+	}
+	err = os.Setenv("APP_WORK_DIR", "")
+	if err != nil {
+		panic(err)
+	}
+	err = os.Setenv("PREPARED_MOD_DIR", "")
+	if err != nil {
+		panic(err)
+	}
 
 	cacheService = local.New(context.Background())
 }
@@ -731,7 +746,7 @@ func Benchmark_ProcessJava(b *testing.B) {
 		b.StopTimer()
 		pipelineId := uuid.New()
 		lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_JAVA)
-		if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil {
+		if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
 			b.Fatal("error during set cancel flag to cache")
 		}
 		b.StartTimer()
@@ -761,7 +776,7 @@ func Benchmark_ProcessPython(b *testing.B) {
 		b.StopTimer()
 		pipelineId := uuid.New()
 		lc := prepareFiles(b, pipelineId, wordCountCode, pb.Sdk_SDK_PYTHON)
-		if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil {
+		if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
 			b.Fatal("error during set cancel flag to cache")
 		}
 		b.StartTimer()
@@ -791,7 +806,7 @@ func Benchmark_ProcessGo(b *testing.B) {
 		b.StopTimer()
 		pipelineId := uuid.New()
 		lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_GO)
-		if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil {
+		if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
 			b.Fatal("error during set cancel flag to cache")
 		}
 		b.StartTimer()
@@ -864,7 +879,6 @@ func Test_validateStep(t *testing.T) {
 		sdkEnv               *environment.BeamEnvs
 		pipelineLifeCycleCtx context.Context
 		validationResults    *sync.Map
-		cancelChannel        chan bool
 	}
 	tests := []struct {
 		name string
@@ -881,7 +895,6 @@ func Test_validateStep(t *testing.T) {
 				sdkEnv:               javaSdkEnv,
 				pipelineLifeCycleCtx: context.Background(),
 				validationResults:    &sync.Map{},
-				cancelChannel:        make(chan bool, 1),
 			},
 			want: 3,
 			code: helloWordJava,
@@ -895,7 +908,6 @@ func Test_validateStep(t *testing.T) {
 				sdkEnv:               incorrectSdkEnv,
 				pipelineLifeCycleCtx: context.Background(),
 				validationResults:    &sync.Map{},
-				cancelChannel:        make(chan bool, 1),
 			},
 			want: 0,
 			code: helloWordJava,
@@ -910,9 +922,9 @@ func Test_validateStep(t *testing.T) {
 			}
 			sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
 			_ = lc.CreateSourceCodeFiles(sources)
-			executor := validateStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.pipelineLifeCycleCtx, tt.args.validationResults, tt.args.cancelChannel)
+			err = validateStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.validationResults)
 			got := syncMapLen(tt.args.validationResults)
-			if executor != nil && !reflect.DeepEqual(got, tt.want) {
+			if err != nil && !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("validateStep() = %d, want %d", got, tt.want)
 			}
 		})
@@ -931,7 +943,8 @@ func Test_prepareStep(t *testing.T) {
 	validationResults := sync.Map{}
 	validationResults.Store(validators.UnitTestValidatorName, false)
 	validationResults.Store(validators.KatasValidatorName, false)
-	pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1)
+	pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1)
+	defer cancel()
 	type args struct {
 		ctx                  context.Context
 		cacheService         cache.Cache
@@ -939,7 +952,6 @@ func Test_prepareStep(t *testing.T) {
 		sdkEnv               *environment.BeamEnvs
 		pipelineLifeCycleCtx context.Context
 		validationResults    *sync.Map
-		cancelChannel        chan bool
 	}
 	tests := []struct {
 		name           string
@@ -956,7 +968,6 @@ func Test_prepareStep(t *testing.T) {
 				sdkEnv:               javaSdkEnv,
 				pipelineLifeCycleCtx: context.Background(),
 				validationResults:    &validationResults,
-				cancelChannel:        make(chan bool, 1),
 			},
 			code:           helloWordJava,
 			expectedStatus: pb.Status_STATUS_COMPILING,
@@ -970,7 +981,6 @@ func Test_prepareStep(t *testing.T) {
 				sdkEnv:               incorrectSdkEnv,
 				pipelineLifeCycleCtx: context.Background(),
 				validationResults:    &validationResults,
-				cancelChannel:        make(chan bool, 1),
 			},
 			code:           "",
 			expectedStatus: pb.Status_STATUS_ERROR,
@@ -984,7 +994,6 @@ func Test_prepareStep(t *testing.T) {
 				sdkEnv:               javaSdkEnv,
 				pipelineLifeCycleCtx: pipelineLifeCycleCtx,
 				validationResults:    &validationResults,
-				cancelChannel:        make(chan bool, 1),
 			},
 			code:           "",
 			expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
@@ -999,7 +1008,7 @@ func Test_prepareStep(t *testing.T) {
 			}
 			sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
 			_ = lc.CreateSourceCodeFiles(sources)
-			prepareStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.pipelineLifeCycleCtx, tt.args.validationResults, tt.args.cancelChannel, nil)
+			_ = prepareStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.validationResults, nil)
 			status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
 			if status != tt.expectedStatus {
 				t.Errorf("prepareStep: got status = %v, want %v", status, tt.expectedStatus)
@@ -1017,7 +1026,8 @@ func Test_compileStep(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1)
+	pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1)
+	defer cancel()
 	type args struct {
 		ctx                  context.Context
 		cacheService         cache.Cache
@@ -1025,7 +1035,6 @@ func Test_compileStep(t *testing.T) {
 		sdkEnv               *environment.BeamEnvs
 		isUnitTest           bool
 		pipelineLifeCycleCtx context.Context
-		cancelChannel        chan bool
 	}
 	tests := []struct {
 		name           string
@@ -1042,7 +1051,6 @@ func Test_compileStep(t *testing.T) {
 				sdkEnv:               sdkJavaEnv,
 				isUnitTest:           false,
 				pipelineLifeCycleCtx: context.Background(),
-				cancelChannel:        make(chan bool, 1),
 			},
 			code:           helloWordJava,
 			expectedStatus: pb.Status_STATUS_EXECUTING,
@@ -1056,7 +1064,6 @@ func Test_compileStep(t *testing.T) {
 				sdkEnv:               sdkPythonEnv,
 				isUnitTest:           false,
 				pipelineLifeCycleCtx: context.Background(),
-				cancelChannel:        make(chan bool, 1),
 			},
 			code:           helloWordPython,
 			expectedStatus: pb.Status_STATUS_EXECUTING,
@@ -1070,7 +1077,6 @@ func Test_compileStep(t *testing.T) {
 				sdkEnv:               sdkJavaEnv,
 				isUnitTest:           false,
 				pipelineLifeCycleCtx: pipelineLifeCycleCtx,
-				cancelChannel:        make(chan bool, 1),
 			},
 			code:           helloWordJava,
 			expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
@@ -1085,7 +1091,7 @@ func Test_compileStep(t *testing.T) {
 			}
 			sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
 			_ = lc.CreateSourceCodeFiles(sources)
-			compileStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel)
+			_ = compileStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest)
 			status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
 			if status != tt.expectedStatus {
 				t.Errorf("compileStep: got status = %v, want %v", status, tt.expectedStatus)
@@ -1115,7 +1121,6 @@ func Test_runStep(t *testing.T) {
 		sdkEnv               *environment.BeamEnvs
 		pipelineOptions      string
 		pipelineLifeCycleCtx context.Context
-		cancelChannel        chan bool
 		createExecFile       bool
 	}
 	tests := []struct {
@@ -1137,7 +1142,6 @@ func Test_runStep(t *testing.T) {
 				sdkEnv:               sdkPythonEnv,
 				pipelineOptions:      "",
 				pipelineLifeCycleCtx: context.Background(),
-				cancelChannel:        make(chan bool, 1),
 				createExecFile:       true,
 			},
 			code:           helloWordPython,
@@ -1156,7 +1160,6 @@ func Test_runStep(t *testing.T) {
 				sdkEnv:               sdkGoEnv,
 				pipelineOptions:      "",
 				pipelineLifeCycleCtx: context.Background(),
-				cancelChannel:        make(chan bool, 1),
 				createExecFile:       true,
 			},
 			code:           helloWordGo,
@@ -1174,7 +1177,6 @@ func Test_runStep(t *testing.T) {
 				sdkEnv:               sdkJavaEnv,
 				pipelineOptions:      "",
 				pipelineLifeCycleCtx: context.Background(),
-				cancelChannel:        make(chan bool, 1),
 				createExecFile:       false,
 			},
 			code:           helloWordJava,
@@ -1192,7 +1194,7 @@ func Test_runStep(t *testing.T) {
 				sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
 				_ = lc.CreateSourceCodeFiles(sources)
 			}
-			runStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel)
+			_ = runStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions)
 			status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
 			if status != tt.expectedStatus {
 				t.Errorf("runStep() got status = %v, want %v", status, tt.expectedStatus)
@@ -1318,7 +1320,7 @@ func Test_processSetupError(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			tt.mocks()
-			if err := processSetupError(tt.args.err, tt.args.pipelineId, tt.args.cacheService, tt.args.ctxWithTimeout); (err != nil) != tt.wantErr {
+			if err := processSetupError(tt.args.err, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
 				t.Errorf("processSetupError() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -1367,7 +1369,7 @@ func Test_processErrorWithSavingOutput(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			tt.mocks()
-			if err := processErrorWithSavingOutput(tt.args.ctx, tt.args.err, tt.args.errorOutput, tt.args.pipelineId, tt.args.subKey, tt.args.cacheService, tt.args.errorTitle, tt.args.newStatus); (err != nil) != tt.wantErr {
+			if err := processErrorWithSavingOutput(tt.args.err, tt.args.errorOutput, tt.args.pipelineId, tt.args.subKey, tt.args.cacheService, tt.args.errorTitle, tt.args.newStatus); (err != nil) != tt.wantErr {
 				t.Errorf("processErrorWithSavingOutput() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -1416,7 +1418,7 @@ func Test_processRunError(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			tt.mocks()
-			if err := processRunError(tt.args.ctx, tt.args.errorChannel, tt.args.errorOutput, tt.args.pipelineId, tt.args.cacheService, tt.args.stopReadLogsChannel, tt.args.finishReadLogsChannel); (err != nil) != tt.wantErr {
+			if err := processRunError(tt.args.errorChannel, tt.args.errorOutput, tt.args.pipelineId, tt.args.cacheService, tt.args.stopReadLogsChannel, tt.args.finishReadLogsChannel); (err != nil) != tt.wantErr {
 				t.Errorf("processRunError() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -1520,7 +1522,7 @@ func Test_processCompileSuccess(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			tt.mocks()
-			if err := processCompileSuccess(tt.args.ctx, tt.args.output, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
+			if err := processCompileSuccess(tt.args.output, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
 				t.Errorf("processCompileSuccess() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -1528,7 +1530,8 @@ func Test_processCompileSuccess(t *testing.T) {
 }
 
 func Test_readGraphFile(t *testing.T) {
-	pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
+	pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+	defer cancel()
 	type args struct {
 		pipelineLifeCycleCtx context.Context
 		backgroundCtx        context.Context
@@ -1553,7 +1556,7 @@ func Test_readGraphFile(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			readGraphFile(tt.args.pipelineLifeCycleCtx, tt.args.backgroundCtx, tt.args.cacheService, tt.args.graphFilePath, tt.args.pipelineId)
+			readGraphFile(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, tt.args.graphFilePath, tt.args.pipelineId)
 			if v, _ := cacheService.GetValue(tt.args.backgroundCtx, tt.args.pipelineId, cache.Graph); v == nil {
 				t.Errorf("readGraphFile() error: the graph was not cached")
 			}
diff --git a/playground/backend/internal/utils/cache_utils.go b/playground/backend/internal/utils/cache_utils.go
index 530064d00a3..537e0a00cbe 100644
--- a/playground/backend/internal/utils/cache_utils.go
+++ b/playground/backend/internal/utils/cache_utils.go
@@ -24,8 +24,11 @@ import (
 
 // SetToCache puts value to cache by key and subKey.
 // If error occurs during the function - logs and returns error.
-func SetToCache(ctx context.Context, cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, value interface{}) error {
-	err := cacheService.SetValue(ctx, key, subKey, value)
+func SetToCache(cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, value interface{}) error {
+	// Background context is used in cache operations so the operation would not be interrupted
+	// by the timeout or cancellation of the pipeline context. Cache timeouts are handled by
+	// the cache client itself.
+	err := cacheService.SetValue(context.Background(), key, subKey, value)
 	if err != nil {
 		logger.Errorf("%s: cache.SetValue: %s\n", key, err.Error())
 		// TODO send email to fix error with writing to cache
diff --git a/playground/backend/internal/utils/cache_utils_test.go b/playground/backend/internal/utils/cache_utils_test.go
index f1c95af8103..5c5a6a6cc28 100644
--- a/playground/backend/internal/utils/cache_utils_test.go
+++ b/playground/backend/internal/utils/cache_utils_test.go
@@ -87,7 +87,7 @@ func TestSetToCache(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := SetToCache(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.subKey, tt.args.value); (err != nil) != tt.wantErr {
+			if err := SetToCache(tt.args.cacheService, tt.args.key, tt.args.subKey, tt.args.value); (err != nil) != tt.wantErr {
 				t.Errorf("SetToCache() error = %v, wantErr %v", err, tt.wantErr)
 			}
 			if !tt.checkFunc() {