You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/04/12 15:11:48 UTC
[beam] branch master updated: [Playground] Simplify contexts in code_processing/Process() (#25747)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ce212ab8cff [Playground] Simplify contexts in code_processing/Process() (#25747)
ce212ab8cff is described below
commit ce212ab8cffd0826c343c744fe841c8424df04a1
Author: Timur Sultanov <ti...@akvelon.com>
AuthorDate: Wed Apr 12 19:11:39 2023 +0400
[Playground] Simplify contexts in code_processing/Process() (#25747)
* Simplify contexts in code_processing
* Remove "cancelChannel", use context's cancellation function to cancel execution
* Return error from validationStep()
* Return error from prepareStep()
* Return error from compileStep()
* Return error from runStep()
* Fix error handling
---
playground/backend/cmd/server/controller.go | 14 +-
.../internal/code_processing/code_processing.go | 237 ++++++++++++---------
.../code_processing/code_processing_test.go | 79 +++----
playground/backend/internal/utils/cache_utils.go | 7 +-
.../backend/internal/utils/cache_utils_test.go | 2 +-
5 files changed, 193 insertions(+), 146 deletions(-)
diff --git a/playground/backend/cmd/server/controller.go b/playground/backend/cmd/server/controller.go
index 778cbdfb4c4..4e379be2595 100644
--- a/playground/backend/cmd/server/controller.go
+++ b/playground/backend/cmd/server/controller.go
@@ -123,19 +123,19 @@ func (controller *playgroundController) RunCode(ctx context.Context, info *pb.Ru
return nil, cerrors.InternalError("Error during preparing", "Error during setup file system for the code processing: %s", err.Error())
}
- if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATING); err != nil {
+ if err = utils.SetToCache(controller.cacheService, pipelineId, cache.Status, pb.Status_STATUS_VALIDATING); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving status of the code processing")
}
- if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.RunOutputIndex, 0); err != nil {
+ if err = utils.SetToCache(controller.cacheService, pipelineId, cache.RunOutputIndex, 0); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving initial run output")
}
- if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.LogsIndex, 0); err != nil {
+ if err = utils.SetToCache(controller.cacheService, pipelineId, cache.LogsIndex, 0); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving value for the logs output")
}
- if err = utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.Canceled, false); err != nil {
+ if err = utils.SetToCache(controller.cacheService, pipelineId, cache.Canceled, false); err != nil {
code_processing.DeleteResources(pipelineId, lc)
return nil, cerrors.InternalError("Error during preparing", "Error during saving initial cancel flag")
}
@@ -185,7 +185,7 @@ func (controller *playgroundController) GetRunOutput(ctx context.Context, info *
newRunOutput := ""
if len(runOutput) > lastIndex {
newRunOutput = runOutput[lastIndex:]
- if err := utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.RunOutputIndex, lastIndex+len(newRunOutput)); err != nil {
+ if err := utils.SetToCache(controller.cacheService, pipelineId, cache.RunOutputIndex, lastIndex+len(newRunOutput)); err != nil {
return nil, cerrors.InternalError(errorMessage, "Error during saving pagination value")
}
}
@@ -215,7 +215,7 @@ func (controller *playgroundController) GetLogs(ctx context.Context, info *pb.Ge
newLogs := ""
if len(logs) > lastIndex {
newLogs = logs[lastIndex:]
- if err := utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.LogsIndex, lastIndex+len(newLogs)); err != nil {
+ if err := utils.SetToCache(controller.cacheService, pipelineId, cache.LogsIndex, lastIndex+len(newLogs)); err != nil {
return nil, cerrors.InternalError(errorMessage, "Error during saving pagination value")
}
}
@@ -308,7 +308,7 @@ func (controller *playgroundController) Cancel(ctx context.Context, info *pb.Can
logger.Errorf("%s: Cancel(): pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid, err.Error())
return nil, cerrors.InvalidArgumentError(errorMessage, "pipelineId has incorrect value and couldn't be parsed as uuid value: %s", info.PipelineUuid)
}
- if err := utils.SetToCache(ctx, controller.cacheService, pipelineId, cache.Canceled, true); err != nil {
+ if err := utils.SetToCache(controller.cacheService, pipelineId, cache.Canceled, true); err != nil {
return nil, cerrors.InternalError(errorMessage, "Error during saving cancel flag value")
}
return &pb.CancelResponse{}, nil
diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go
index 32cd9c38027..a249592e145 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -62,19 +62,19 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
DeleteResources(pipelineId, lc)
}(lc)
- cancelChannel := make(chan bool, 1)
-
var validationResults sync.Map
- go cancelCheck(pipelineLifeCycleCtx, pipelineId, cancelChannel, cacheService)
+ go cancelCheck(pipelineLifeCycleCtx, pipelineId, finishCtxFunc, cacheService)
- executor := validateStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, pipelineLifeCycleCtx, &validationResults, cancelChannel)
- if executor == nil {
+ err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults)
+ if err != nil {
+ logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
return
}
- executor = prepareStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, pipelineLifeCycleCtx, &validationResults, cancelChannel, lc.GetPreparerParameters())
- if executor == nil {
+ err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
+ if err != nil {
+ logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
return
}
@@ -82,16 +82,20 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
validateIsUnitTest, _ := validationResults.Load(validators.UnitTestValidatorName)
isUnitTest := validateIsUnitTest.(bool)
- executor = compileStep(ctx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest, pipelineLifeCycleCtx, cancelChannel)
- if executor == nil {
+ err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest)
+ if err != nil {
+ logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
return
}
// Run/RunTest
- runStep(ctx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions, pipelineLifeCycleCtx, cancelChannel)
+ err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions)
+ if err != nil {
+ logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
+ }
}
-func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, isUnitTest bool, sdkEnv *environment.BeamEnvs, pipelineOptions string, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) {
+func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, isUnitTest bool, sdkEnv *environment.BeamEnvs, pipelineOptions string) error {
errorChannel, successChannel := createStatusChannels()
stopReadLogsChannel := make(chan bool, 1)
finishReadLogsChannel := make(chan bool, 1)
@@ -99,22 +103,24 @@ func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeC
var executorBuilder *executors.ExecutorBuilder
err := error(nil)
if isUnitTest {
- executorBuilder, err = builder.TestRunner(pipelineLifeCycleCtx, paths, sdkEnv)
+ executorBuilder, err = builder.TestRunner(ctx, paths, sdkEnv)
} else {
- executorBuilder, err = builder.Runner(pipelineLifeCycleCtx, paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
+ executorBuilder, err = builder.Runner(ctx, paths, utils.ReduceWhiteSpacesToSinge(pipelineOptions), sdkEnv)
}
if err != nil {
- _ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
- return
+ if processingErr := processSetupError(err, pipelineId, cacheService); processingErr != nil {
+ return processingErr
+ }
+ return err
}
executor := executorBuilder.Build()
logger.Infof("%s: Run()/Test() ...\n", pipelineId)
- runCmd := getExecuteCmd(isUnitTest, &executor, pipelineLifeCycleCtx)
+ runCmd := getExecuteCmd(isUnitTest, &executor, ctx)
var runError bytes.Buffer
- runOutput := streaming.RunOutputWriter{Ctx: pipelineLifeCycleCtx, CacheService: cacheService, PipelineId: pipelineId}
- go readLogFile(pipelineLifeCycleCtx, ctx, cacheService, paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)
- go readGraphFile(pipelineLifeCycleCtx, ctx, cacheService, paths.AbsoluteGraphFilePath, pipelineId)
+ runOutput := streaming.RunOutputWriter{Ctx: ctx, CacheService: cacheService, PipelineId: pipelineId}
+ go readLogFile(ctx, cacheService, paths.AbsoluteLogFilePath, pipelineId, stopReadLogsChannel, finishReadLogsChannel)
+ go readGraphFile(ctx, cacheService, paths.AbsoluteGraphFilePath, pipelineId)
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
// For go SDK all logs are placed to stdErr.
@@ -133,9 +139,9 @@ func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeC
}
// Start of the monitoring of background tasks (run step/cancellation/timeout)
- ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+ ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
- return
+ return err
}
if !ok {
// If unit test has some error then error output is placed as RunOutput
@@ -155,20 +161,26 @@ func runStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeC
}
runError.Write(errData)
}
- _ = processRunError(pipelineLifeCycleCtx, errorChannel, runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
- return
+ processingErr := processRunError(errorChannel, runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
+ if processingErr != nil {
+ return processingErr
+ }
+ return fmt.Errorf("run error: %s", runError.String())
}
// Run step is finished and code is executed
- _ = processRunSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
+ err = processRunSuccess(pipelineId, cacheService, stopReadLogsChannel, finishReadLogsChannel)
+ if err != nil {
+ return err
+ }
+ return nil
}
-func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, isUnitTest bool, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) *executors.Executor {
+func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, isUnitTest bool) error {
errorChannel, successChannel := createStatusChannels()
- var executor = executors.Executor{}
// This condition is used for cases when the playground doesn't compile source files. For the Python code and the Go Unit Tests
if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_SCIO || (sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO && isUnitTest) {
- if err := processCompileSuccess(pipelineLifeCycleCtx, []byte(""), pipelineId, cacheService); err != nil {
- return nil
+ if err := processCompileSuccess([]byte(""), pipelineId, cacheService); err != nil {
+ return err
}
} else { // in case of Java, Go (not unit test), Scala - need compile step
executorBuilder, err := builder.Compiler(paths, sdkEnv)
@@ -178,34 +190,39 @@ func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
}
executor := executorBuilder.Build()
logger.Infof("%s: Compile() ...\n", pipelineId)
- compileCmd := executor.Compile(pipelineLifeCycleCtx)
+ compileCmd := executor.Compile(ctx)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
// Start of the monitoring of background tasks (compile step/cancellation/timeout)
- ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+ ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
- return nil
+ return err
}
if !ok { // Compile step is finished, but code couldn't be compiled (some typos for example)
err := <-errorChannel
- _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
- return nil
+ processingErr := processErrorWithSavingOutput(err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, "Compile", pb.Status_STATUS_COMPILE_ERROR)
+ if processingErr != nil {
+ return processingErr
+ }
+ return err
} // Compile step is finished and code is compiled
- if err := processCompileSuccess(pipelineLifeCycleCtx, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
- return nil
+ if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil {
+ return err
}
}
- return &executor
+ return nil
}
-func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, pipelineLifeCycleCtx context.Context, validationResults *sync.Map, cancelChannel chan bool, prepareParams map[string]string) *executors.Executor {
+func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, validationResults *sync.Map, prepareParams map[string]string) error {
errorChannel, successChannel := createStatusChannels()
executorBuilder, err := builder.Preparer(paths, sdkEnv, validationResults, prepareParams)
if err != nil {
- _ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
- return nil
+ if processingErr := processSetupError(err, pipelineId, cacheService); processingErr != nil {
+ return processingErr
+ }
+ return err
}
executor := executorBuilder.Build()
logger.Infof("%s: Prepare() ...\n", pipelineId)
@@ -213,29 +230,34 @@ func prepareStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
go prepareFunc(successChannel, errorChannel, validationResults)
// Start of the monitoring of background tasks (prepare function/cancellation/timeout)
- ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+ ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
- return nil
+ return err
}
if !ok {
err := <-errorChannel
// Prepare step is finished, but code couldn't be prepared (some error during prepare step)
- _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
- return nil
+ processingErr := processErrorWithSavingOutput(err, []byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, "Prepare", pb.Status_STATUS_PREPARATION_ERROR)
+ if processingErr != nil {
+ return processingErr
+ }
+ return err
}
// Prepare step is finished and code is prepared
- if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
- return nil
+ if err := processSuccess(pipelineId, cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
+ return err
}
- return &executor
+ return nil
}
-func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, pipelineLifeCycleCtx context.Context, validationResults *sync.Map, cancelChannel chan bool) *executors.Executor {
+func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.LifeCyclePaths, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, validationResults *sync.Map) error {
errorChannel, successChannel := createStatusChannels()
executorBuilder, err := builder.Validator(paths, sdkEnv)
if err != nil {
- _ = processSetupError(err, pipelineId, cacheService, pipelineLifeCycleCtx)
- return nil
+ if processingError := processSetupError(err, pipelineId, cacheService); processingError != nil {
+ return processingError
+ }
+ return err
}
executor := executorBuilder.Build()
logger.Infof("%s: Validate() ...\n", pipelineId)
@@ -243,22 +265,25 @@ func validateStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.
go validateFunc(successChannel, errorChannel, validationResults)
// Start of the monitoring of background tasks (validate function/cancellation/timeout)
- ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, pipelineId, cacheService, cancelChannel, successChannel)
+ ok, err := reconcileBackgroundTask(ctx, pipelineId, cacheService, successChannel)
if err != nil {
- return nil
+ return err
}
if !ok {
err := <-errorChannel
// Validate step is finished, but code isn't valid
- _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
- return nil
+ processingErr := processErrorWithSavingOutput(err, []byte(err.Error()), pipelineId, cache.ValidationOutput, cacheService, "Validate", pb.Status_STATUS_VALIDATION_ERROR)
+ if processingErr != nil {
+ return processingErr
+ }
+ return err
}
// Validate step is finished and code is valid
- if err := processSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
- return nil
+ if err := processSuccess(pipelineId, cacheService, "Validate", pb.Status_STATUS_PREPARING); err != nil {
+ return err
}
- return &executor
+ return err
}
func createStatusChannels() (chan error, chan bool) {
@@ -278,9 +303,10 @@ func getExecuteCmd(isUnitTest bool, executor *executors.Executor, ctxWithTimeout
}
// processSetupError processes errors during the setting up an executor builder
-func processSetupError(err error, pipelineId uuid.UUID, cacheService cache.Cache, ctxWithTimeout context.Context) error {
+func processSetupError(err error, pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Errorf("%s: error during setup builder: %s\n", pipelineId, err.Error())
- if err = utils.SetToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR); err != nil {
+ if err = utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR); err != nil {
+ logger.Errorf("%s: error during saving error message: %s", pipelineId, err)
return err
}
return nil
@@ -379,14 +405,24 @@ func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError io.Writer, su
// If cmd operation (Validate/Prepare/Compile/Run/RunTest) finishes successfully but with some error
//
// during step processing - returns false.
-func reconcileBackgroundTask(pipelineLifeCycleCtx, backgroundCtx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool) (bool, error) {
+func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, successChannel chan bool) (bool, error) {
select {
case <-pipelineLifeCycleCtx.Done():
- _ = finishByTimeout(backgroundCtx, pipelineId, cacheService)
- return false, fmt.Errorf("%s: context was done", pipelineId)
- case <-cancelChannel:
- _ = processCancel(pipelineLifeCycleCtx, cacheService, pipelineId)
- return false, fmt.Errorf("%s: code processing was canceled", pipelineId)
+ contextErr := pipelineLifeCycleCtx.Err()
+ switch contextErr {
+ case context.DeadlineExceeded:
+ if err := finishByTimeout(pipelineId, cacheService); err != nil {
+ return false, fmt.Errorf("error during context timeout processing: %s", err.Error())
+ }
+ return false, fmt.Errorf("code processing context timeout")
+ case context.Canceled:
+ if err := processCancel(cacheService, pipelineId); err != nil {
+ return false, fmt.Errorf("error during cancellation processing: %s", err.Error())
+ }
+ return false, fmt.Errorf("code processing was canceled")
+ default:
+ return false, fmt.Errorf("code processing cancelled: %s", contextErr.Error())
+ }
case ok := <-successChannel:
return ok, nil
}
@@ -396,7 +432,7 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx, backgroundCtx context.Context
// If cancel flag doesn't exist in cache continue working.
// If context is done it means that the code processing was finished (successfully/with error/timeout). Return.
// If cancel flag exists, and it is true it means that the code processing was canceled. Set true to cancelChannel and return.
-func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan bool, cacheService cache.Cache) {
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelFunc context.CancelFunc, cacheService cache.Cache) {
ticker := time.NewTicker(pauseDuration)
for {
select {
@@ -409,7 +445,7 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
logger.Errorf("%s: Error during getting value from the cache: %s", pipelineId, err.Error())
}
if cancel.(bool) {
- cancelChannel <- true
+ cancelFunc()
return
}
}
@@ -420,7 +456,7 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
// If context is done it means that the code processing was finished (successfully/with error/timeout).
// Write graph to the cache if this in the file.
// In other case each pauseDuration checks that graph file exists or not and try to save it to the cache.
-func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheService cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
+func readGraphFile(pipelineLifeCycleCtx context.Context, cacheService cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
ticker := time.NewTicker(pauseDuration)
for {
select {
@@ -432,7 +468,7 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer
if err != nil {
logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error())
}
- _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph))
+ _ = utils.SetToCache(cacheService, pipelineId, cache.Graph, string(graph))
}
// in case of timeout or cancel
case <-pipelineLifeCycleCtx.Done():
@@ -442,7 +478,7 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer
if err != nil {
logger.Errorf("%s: Error during saving graph to the file: %s", pipelineId, err.Error())
}
- _ = utils.SetToCache(backgroundCtx, cacheService, pipelineId, cache.Graph, string(graph))
+ _ = utils.SetToCache(cacheService, pipelineId, cache.Graph, string(graph))
}
return
}
@@ -457,29 +493,29 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheSer
// to the cache and set value to the finishReadLogChannel channel to unblock the code processing.
//
// In other case each pauseDuration write to cache logs of the code processing.
-func readLogFile(pipelineLifeCycleCtx, backgroundCtx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, finishReadLogChannel chan bool) {
+func readLogFile(pipelineLifeCycleCtx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel, finishReadLogChannel chan bool) {
ticker := time.NewTicker(pauseDuration)
for {
select {
// in case of timeout or cancel
case <-pipelineLifeCycleCtx.Done():
- _ = finishReadLogFile(backgroundCtx, ticker, cacheService, logFilePath, pipelineId)
+ _ = finishReadLogFile(ticker, cacheService, logFilePath, pipelineId)
return
// in case of pipeline finish successfully or has error on the run step
case <-stopReadLogsChannel:
- _ = finishReadLogFile(pipelineLifeCycleCtx, ticker, cacheService, logFilePath, pipelineId)
+ _ = finishReadLogFile(ticker, cacheService, logFilePath, pipelineId)
finishReadLogChannel <- true
return
case <-ticker.C:
- _ = writeLogsToCache(pipelineLifeCycleCtx, cacheService, logFilePath, pipelineId)
+ _ = writeLogsToCache(cacheService, logFilePath, pipelineId)
}
}
}
// finishReadLogFile is used to read logs file for the last time
-func finishReadLogFile(ctx context.Context, ticker *time.Ticker, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
+func finishReadLogFile(ticker *time.Ticker, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
ticker.Stop()
- return writeLogsToCache(ctx, cacheService, logFilePath, pipelineId)
+ return writeLogsToCache(cacheService, logFilePath, pipelineId)
}
// writeLogsToCache write all logs from the log file to the cache.
@@ -490,7 +526,7 @@ func finishReadLogFile(ctx context.Context, ticker *time.Ticker, cacheService ca
//
// If log file exists, read all from the log file and keep it to the cache using cache.Logs subKey.
// If some error occurs, log the error and return the error.
-func writeLogsToCache(ctx context.Context, cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
+func writeLogsToCache(cacheService cache.Cache, logFilePath string, pipelineId uuid.UUID) error {
if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
return nil
}
@@ -499,7 +535,7 @@ func writeLogsToCache(ctx context.Context, cacheService cache.Cache, logFilePath
logger.Errorf("%s: writeLogsToCache(): error during read from logs file: %s", pipelineId, err.Error())
return err
}
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Logs, string(logs))
+ return utils.SetToCache(cacheService, pipelineId, cache.Logs, string(logs))
}
// DeleteResources removes all prepared resources for received LifeCycle
@@ -515,22 +551,27 @@ func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
}
// finishByTimeout is used in case of runCode method finished by timeout
-func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) error {
+func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
+ return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
}
// processErrorWithSavingOutput processes error with saving to cache received error output.
-func processErrorWithSavingOutput(ctx context.Context, err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
+func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
- if err := utils.SetToCache(ctx, cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
+ logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error())
return err
}
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+ if err = utils.SetToCache(cacheService, pipelineId, cache.Status, newStatus); err != nil {
+ logger.Errorf("%s: failed to save status to cache: %s", pipelineId, err.Error())
+ return err
+ }
+ return nil
}
// processRunError processes error received during processing run step.
@@ -538,51 +579,51 @@ func processErrorWithSavingOutput(ctx context.Context, err error, errorOutput []
//
// After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
// sets corresponding status to the cache.
-func processRunError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+func processRunError(errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
err := <-errorChannel
logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
- if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, fmt.Sprintf("error: %s\noutput: %s", err.Error(), string(errorOutput))); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, cache.RunError, fmt.Sprintf("error: %s\noutput: %s", err.Error(), string(errorOutput))); err != nil {
return err
}
stopReadLogsChannel <- true
<-finishReadLogsChannel
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
+ return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
}
// processSuccess processes case after successful process validation or preparation steps.
// This method sets corresponding status to the cache.
-func processSuccess(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, successTitle string, newStatus pb.Status) error {
+func processSuccess(pipelineId uuid.UUID, cacheService cache.Cache, successTitle string, newStatus pb.Status) error {
logger.Infof("%s: %s(): finish\n", pipelineId, successTitle)
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+ return utils.SetToCache(cacheService, pipelineId, cache.Status, newStatus)
}
// processCompileSuccess processes case after successful compile step.
// This method sets output of the compile step, sets empty string as output of the run step and
//
// sets corresponding status to the cache.
-func processCompileSuccess(ctx context.Context, output []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
+func processCompileSuccess(output []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Infof("%s: Compile() finish\n", pipelineId)
- if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.CompileOutput, string(output)); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, cache.CompileOutput, string(output)); err != nil {
return err
}
- if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunOutput, ""); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, cache.RunOutput, ""); err != nil {
return err
}
- if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, ""); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, cache.RunError, ""); err != nil {
return err
}
- if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.Logs, ""); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, cache.Logs, ""); err != nil {
return err
}
- if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.Graph, ""); err != nil {
+ if err := utils.SetToCache(cacheService, pipelineId, cache.Graph, ""); err != nil {
return err
}
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
+ return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_EXECUTING)
}
// processRunSuccess processes case after successful run step.
@@ -590,19 +631,19 @@ func processCompileSuccess(ctx context.Context, output []byte, pipelineId uuid.U
//
// After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
// sets corresponding status to the cache.
-func processRunSuccess(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+func processRunSuccess(pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
logger.Infof("%s: Run() finish\n", pipelineId)
stopReadLogsChannel <- true
<-finishReadLogsChannel
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
+ return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_FINISHED)
}
// processCancel process case when code processing was canceled
-func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId uuid.UUID) error {
+func processCancel(cacheService cache.Cache, pipelineId uuid.UUID) error {
logger.Infof("%s: was canceled\n", pipelineId)
// set to cache pipelineId: cache.SubKey_Status: pb.Status_STATUS_CANCELED
- return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
+ return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_CANCELED)
}
diff --git a/playground/backend/internal/code_processing/code_processing_test.go b/playground/backend/internal/code_processing/code_processing_test.go
index b9f4f7131c4..b650e012aef 100644
--- a/playground/backend/internal/code_processing/code_processing_test.go
+++ b/playground/backend/internal/code_processing/code_processing_test.go
@@ -97,8 +97,14 @@ func setup() {
if err != nil {
panic(err)
}
- os.Setenv("BEAM_SDK", pb.Sdk_SDK_JAVA.String())
- os.Setenv("APP_WORK_DIR", path)
+ err = os.Setenv("BEAM_SDK", pb.Sdk_SDK_JAVA.String())
+ if err != nil {
+ panic(err)
+ }
+ err = os.Setenv("APP_WORK_DIR", path)
+ if err != nil {
+ panic(err)
+ }
cacheService = local.New(context.Background())
}
@@ -296,14 +302,14 @@ func Test_Process(t *testing.T) {
if tt.createExecFile {
_ = lc.CreateSourceCodeFiles(sources)
}
- if err = utils.SetToCache(tt.args.ctx, cacheService, tt.args.pipelineId, cache.Canceled, false); err != nil {
+ if err = utils.SetToCache(cacheService, tt.args.pipelineId, cache.Canceled, false); err != nil {
t.Fatal("error during set cancel flag to cache")
}
if tt.cancelFunc {
go func(ctx context.Context, pipelineId uuid.UUID) {
// to imitate behavior of cancellation
time.Sleep(5 * time.Second)
- cacheService.SetValue(ctx, pipelineId, cache.Canceled, true)
+ _ = cacheService.SetValue(ctx, pipelineId, cache.Canceled, true)
}(tt.args.ctx, tt.args.pipelineId)
}
Process(tt.args.ctx, cacheService, lc, tt.args.pipelineId, tt.args.appEnv, tt.args.sdkEnv, tt.args.pipelineOptions)
@@ -675,9 +681,18 @@ func setupSDK(sdk pb.Sdk) {
panic(err)
}
- os.Setenv("BEAM_SDK", sdk.String())
- os.Setenv("APP_WORK_DIR", "")
- os.Setenv("PREPARED_MOD_DIR", "")
+ err = os.Setenv("BEAM_SDK", sdk.String())
+ if err != nil {
+ panic(err)
+ }
+ err = os.Setenv("APP_WORK_DIR", "")
+ if err != nil {
+ panic(err)
+ }
+ err = os.Setenv("PREPARED_MOD_DIR", "")
+ if err != nil {
+ panic(err)
+ }
cacheService = local.New(context.Background())
}
@@ -731,7 +746,7 @@ func Benchmark_ProcessJava(b *testing.B) {
b.StopTimer()
pipelineId := uuid.New()
lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_JAVA)
- if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil {
+ if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
b.Fatal("error during set cancel flag to cache")
}
b.StartTimer()
@@ -761,7 +776,7 @@ func Benchmark_ProcessPython(b *testing.B) {
b.StopTimer()
pipelineId := uuid.New()
lc := prepareFiles(b, pipelineId, wordCountCode, pb.Sdk_SDK_PYTHON)
- if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil {
+ if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
b.Fatal("error during set cancel flag to cache")
}
b.StartTimer()
@@ -791,7 +806,7 @@ func Benchmark_ProcessGo(b *testing.B) {
b.StopTimer()
pipelineId := uuid.New()
lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_GO)
- if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil {
+ if err = utils.SetToCache(cacheService, pipelineId, cache.Canceled, false); err != nil {
b.Fatal("error during set cancel flag to cache")
}
b.StartTimer()
@@ -864,7 +879,6 @@ func Test_validateStep(t *testing.T) {
sdkEnv *environment.BeamEnvs
pipelineLifeCycleCtx context.Context
validationResults *sync.Map
- cancelChannel chan bool
}
tests := []struct {
name string
@@ -881,7 +895,6 @@ func Test_validateStep(t *testing.T) {
sdkEnv: javaSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &sync.Map{},
- cancelChannel: make(chan bool, 1),
},
want: 3,
code: helloWordJava,
@@ -895,7 +908,6 @@ func Test_validateStep(t *testing.T) {
sdkEnv: incorrectSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &sync.Map{},
- cancelChannel: make(chan bool, 1),
},
want: 0,
code: helloWordJava,
@@ -910,9 +922,9 @@ func Test_validateStep(t *testing.T) {
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
- executor := validateStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.pipelineLifeCycleCtx, tt.args.validationResults, tt.args.cancelChannel)
+ err = validateStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.validationResults)
got := syncMapLen(tt.args.validationResults)
- if executor != nil && !reflect.DeepEqual(got, tt.want) {
+ if err != nil && !reflect.DeepEqual(got, tt.want) {
t.Errorf("validateStep() = %d, want %d", got, tt.want)
}
})
@@ -931,7 +943,8 @@ func Test_prepareStep(t *testing.T) {
validationResults := sync.Map{}
validationResults.Store(validators.UnitTestValidatorName, false)
validationResults.Store(validators.KatasValidatorName, false)
- pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1)
+ pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1)
+ defer cancel()
type args struct {
ctx context.Context
cacheService cache.Cache
@@ -939,7 +952,6 @@ func Test_prepareStep(t *testing.T) {
sdkEnv *environment.BeamEnvs
pipelineLifeCycleCtx context.Context
validationResults *sync.Map
- cancelChannel chan bool
}
tests := []struct {
name string
@@ -956,7 +968,6 @@ func Test_prepareStep(t *testing.T) {
sdkEnv: javaSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &validationResults,
- cancelChannel: make(chan bool, 1),
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_COMPILING,
@@ -970,7 +981,6 @@ func Test_prepareStep(t *testing.T) {
sdkEnv: incorrectSdkEnv,
pipelineLifeCycleCtx: context.Background(),
validationResults: &validationResults,
- cancelChannel: make(chan bool, 1),
},
code: "",
expectedStatus: pb.Status_STATUS_ERROR,
@@ -984,7 +994,6 @@ func Test_prepareStep(t *testing.T) {
sdkEnv: javaSdkEnv,
pipelineLifeCycleCtx: pipelineLifeCycleCtx,
validationResults: &validationResults,
- cancelChannel: make(chan bool, 1),
},
code: "",
expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
@@ -999,7 +1008,7 @@ func Test_prepareStep(t *testing.T) {
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
- prepareStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.pipelineLifeCycleCtx, tt.args.validationResults, tt.args.cancelChannel, nil)
+ _ = prepareStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.validationResults, nil)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if status != tt.expectedStatus {
t.Errorf("prepareStep: got status = %v, want %v", status, tt.expectedStatus)
@@ -1017,7 +1026,8 @@ func Test_compileStep(t *testing.T) {
if err != nil {
panic(err)
}
- pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1)
+ pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1)
+ defer cancel()
type args struct {
ctx context.Context
cacheService cache.Cache
@@ -1025,7 +1035,6 @@ func Test_compileStep(t *testing.T) {
sdkEnv *environment.BeamEnvs
isUnitTest bool
pipelineLifeCycleCtx context.Context
- cancelChannel chan bool
}
tests := []struct {
name string
@@ -1042,7 +1051,6 @@ func Test_compileStep(t *testing.T) {
sdkEnv: sdkJavaEnv,
isUnitTest: false,
pipelineLifeCycleCtx: context.Background(),
- cancelChannel: make(chan bool, 1),
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_EXECUTING,
@@ -1056,7 +1064,6 @@ func Test_compileStep(t *testing.T) {
sdkEnv: sdkPythonEnv,
isUnitTest: false,
pipelineLifeCycleCtx: context.Background(),
- cancelChannel: make(chan bool, 1),
},
code: helloWordPython,
expectedStatus: pb.Status_STATUS_EXECUTING,
@@ -1070,7 +1077,6 @@ func Test_compileStep(t *testing.T) {
sdkEnv: sdkJavaEnv,
isUnitTest: false,
pipelineLifeCycleCtx: pipelineLifeCycleCtx,
- cancelChannel: make(chan bool, 1),
},
code: helloWordJava,
expectedStatus: pb.Status_STATUS_RUN_TIMEOUT,
@@ -1085,7 +1091,7 @@ func Test_compileStep(t *testing.T) {
}
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
- compileStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel)
+ _ = compileStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if status != tt.expectedStatus {
t.Errorf("compileStep: got status = %v, want %v", status, tt.expectedStatus)
@@ -1115,7 +1121,6 @@ func Test_runStep(t *testing.T) {
sdkEnv *environment.BeamEnvs
pipelineOptions string
pipelineLifeCycleCtx context.Context
- cancelChannel chan bool
createExecFile bool
}
tests := []struct {
@@ -1137,7 +1142,6 @@ func Test_runStep(t *testing.T) {
sdkEnv: sdkPythonEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
- cancelChannel: make(chan bool, 1),
createExecFile: true,
},
code: helloWordPython,
@@ -1156,7 +1160,6 @@ func Test_runStep(t *testing.T) {
sdkEnv: sdkGoEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
- cancelChannel: make(chan bool, 1),
createExecFile: true,
},
code: helloWordGo,
@@ -1174,7 +1177,6 @@ func Test_runStep(t *testing.T) {
sdkEnv: sdkJavaEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
- cancelChannel: make(chan bool, 1),
createExecFile: false,
},
code: helloWordJava,
@@ -1192,7 +1194,7 @@ func Test_runStep(t *testing.T) {
sources := []entity.FileEntity{{Name: "main.java", Content: tt.code, IsMain: true}}
_ = lc.CreateSourceCodeFiles(sources)
}
- runStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel)
+ _ = runStep(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions)
status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status)
if status != tt.expectedStatus {
t.Errorf("runStep() got status = %v, want %v", status, tt.expectedStatus)
@@ -1318,7 +1320,7 @@ func Test_processSetupError(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
- if err := processSetupError(tt.args.err, tt.args.pipelineId, tt.args.cacheService, tt.args.ctxWithTimeout); (err != nil) != tt.wantErr {
+ if err := processSetupError(tt.args.err, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
t.Errorf("processSetupError() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -1367,7 +1369,7 @@ func Test_processErrorWithSavingOutput(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
- if err := processErrorWithSavingOutput(tt.args.ctx, tt.args.err, tt.args.errorOutput, tt.args.pipelineId, tt.args.subKey, tt.args.cacheService, tt.args.errorTitle, tt.args.newStatus); (err != nil) != tt.wantErr {
+ if err := processErrorWithSavingOutput(tt.args.err, tt.args.errorOutput, tt.args.pipelineId, tt.args.subKey, tt.args.cacheService, tt.args.errorTitle, tt.args.newStatus); (err != nil) != tt.wantErr {
t.Errorf("processErrorWithSavingOutput() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -1416,7 +1418,7 @@ func Test_processRunError(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
- if err := processRunError(tt.args.ctx, tt.args.errorChannel, tt.args.errorOutput, tt.args.pipelineId, tt.args.cacheService, tt.args.stopReadLogsChannel, tt.args.finishReadLogsChannel); (err != nil) != tt.wantErr {
+ if err := processRunError(tt.args.errorChannel, tt.args.errorOutput, tt.args.pipelineId, tt.args.cacheService, tt.args.stopReadLogsChannel, tt.args.finishReadLogsChannel); (err != nil) != tt.wantErr {
t.Errorf("processRunError() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -1520,7 +1522,7 @@ func Test_processCompileSuccess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mocks()
- if err := processCompileSuccess(tt.args.ctx, tt.args.output, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
+ if err := processCompileSuccess(tt.args.output, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr {
t.Errorf("processCompileSuccess() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -1528,7 +1530,8 @@ func Test_processCompileSuccess(t *testing.T) {
}
func Test_readGraphFile(t *testing.T) {
- pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
+ pipelineLifeCycleCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
type args struct {
pipelineLifeCycleCtx context.Context
backgroundCtx context.Context
@@ -1553,7 +1556,7 @@ func Test_readGraphFile(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- readGraphFile(tt.args.pipelineLifeCycleCtx, tt.args.backgroundCtx, tt.args.cacheService, tt.args.graphFilePath, tt.args.pipelineId)
+ readGraphFile(tt.args.pipelineLifeCycleCtx, tt.args.cacheService, tt.args.graphFilePath, tt.args.pipelineId)
if v, _ := cacheService.GetValue(tt.args.backgroundCtx, tt.args.pipelineId, cache.Graph); v == nil {
t.Errorf("readGraphFile() error: the graph was not cached")
}
diff --git a/playground/backend/internal/utils/cache_utils.go b/playground/backend/internal/utils/cache_utils.go
index 530064d00a3..537e0a00cbe 100644
--- a/playground/backend/internal/utils/cache_utils.go
+++ b/playground/backend/internal/utils/cache_utils.go
@@ -24,8 +24,11 @@ import (
// SetToCache puts value to cache by key and subKey.
// If error occurs during the function - logs and returns error.
-func SetToCache(ctx context.Context, cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, value interface{}) error {
- err := cacheService.SetValue(ctx, key, subKey, value)
+func SetToCache(cacheService cache.Cache, key uuid.UUID, subKey cache.SubKey, value interface{}) error {
+ // Background context is used in cache operations so the operation would not be interrupted
+ // by the timeout or cancellation of the pipeline context. Cache timeouts are handled by
+ // the cache client itself.
+ err := cacheService.SetValue(context.Background(), key, subKey, value)
if err != nil {
logger.Errorf("%s: cache.SetValue: %s\n", key, err.Error())
// TODO send email to fix error with writing to cache
diff --git a/playground/backend/internal/utils/cache_utils_test.go b/playground/backend/internal/utils/cache_utils_test.go
index f1c95af8103..5c5a6a6cc28 100644
--- a/playground/backend/internal/utils/cache_utils_test.go
+++ b/playground/backend/internal/utils/cache_utils_test.go
@@ -87,7 +87,7 @@ func TestSetToCache(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := SetToCache(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.subKey, tt.args.value); (err != nil) != tt.wantErr {
+ if err := SetToCache(tt.args.cacheService, tt.args.key, tt.args.subKey, tt.args.value); (err != nil) != tt.wantErr {
t.Errorf("SetToCache() error = %v, wantErr %v", err, tt.wantErr)
}
if !tt.checkFunc() {