You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/06 10:55:16 UTC

[GitHub] [beam] ilya-kozyrev commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

ilya-kozyrev commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762899163



##########
File path: playground/backend/internal/setup_tools/builder/setup_builder_test.go
##########
@@ -30,6 +30,9 @@ func TestSetupExecutor(t *testing.T) {
 	pipelineId := uuid.New()
 	sdk := pb.Sdk_SDK_JAVA
 	lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, "")
+	if err != nil {
+		panic(err)

Review comment:
       could we use t.error instead? 
   ```suggestion
   		t.Error(err)
   ```

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {

Review comment:
       Could we extract this logic to a different function? 

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -261,69 +368,92 @@ func DeleteFolders(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
 }
 
 // finishByTimeout is used in case of runCode method finished by timeout
-func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
+func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) error {
 	logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
 
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
-	cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
-// processError processes error received during processing code via setting a corresponding status and output to cache
-func processError(ctx context.Context, err error, data []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
-	switch status {
-	case pb.Status_STATUS_VALIDATION_ERROR:
-		logger.Errorf("%s: Validate: %s\n", pipelineId, err.Error())
-
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_VALIDATION_ERROR)
-	case pb.Status_STATUS_PREPARATION_ERROR:
-		logger.Errorf("%s: Prepare: %s\n", pipelineId, err.Error())
+// processError processes error received during processing validation or preparation steps.
+// This method sets corresponding status to the cache.
+func processError(ctx context.Context, errorChannel chan error, pipelineId uuid.UUID, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
+	err := <-errorChannel
+	logger.Errorf("%s: %s(): %s\n", pipelineId, errorTitle, err.Error())
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_PREPARATION_ERROR)
-	case pb.Status_STATUS_COMPILE_ERROR:
-		logger.Errorf("%s: Compile: err: %s, output: %s\n", pipelineId, err.Error(), data)
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+}
 
-		cacheService.SetValue(ctx, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(data))
+// processCompileError processes error received during processing compile step.
+// This method sets error output and corresponding status to the cache.
+func processCompileError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
+	err := <-errorChannel
+	logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
-	case pb.Status_STATUS_RUN_ERROR:
-		logger.Errorf("%s: Run: err: %s, output: %s\n", pipelineId, err.Error(), data)
+	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(errorOutput)); err != nil {
+		return err
+	}
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
+}
 
-		cacheService.SetValue(ctx, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(data))
+// processRunError processes error received during processing run step.
+// This method sets error output to the cache and after that sets value to channel to stop goroutine which writes logs.
+//	After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
+//	sets corresponding status to the cache.
+func processRunError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+	err := <-errorChannel
+	logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
+	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(errorOutput)); err != nil {

Review comment:
       Could we avoid string concatenation and use string formatting instead? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org