You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/03 12:33:52 UTC

[GitHub] [beam] AydarZaynutdinov opened a new pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

AydarZaynutdinov opened a new pull request #16121:
URL: https://github.com/apache/beam/pull/16121


   [BEAM-13334]
   Implement adding go code processing logs to the cache
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761996512



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {
+		if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {

Review comment:
       after the second `if` we have some logic that should be done without checking SDK:
   ```
   ...
   if !ok {
     if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
       // code for the Go SDK only
     }
     // code for all SDK (including Go SDK after the top part)
     return
   }
   ...
   ```
   
   If I join these 2 `if`s it will be something like:
   ```
   if !ok && sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
     // code for the Go SDK only
     // code for all SDK (including Go SDK after the top part)
     return
   }
   if !ok {
     // code for all SDK (including Go SDK after the top part)
     return
   }
   ```
   
   So I think it's better to use present code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762031955



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -250,6 +308,55 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
 	}
 }
 
+// readLogFile reads logs from the log file and keeps it to the cache.
+// If context is done it means that the code processing was finished (successfully/with error/timeout). Write last logs to the cache.
+// If <-stopReadLogsChannel it means that the code processing was finished (canceled/timeout)
+// 	and it waits until the method stops the work to change status to the pb.Status_STATUS_FINISHED. Write last logs

Review comment:
       Why is there a tabulation necessary here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #16121:
URL: https://github.com/apache/beam/pull/16121


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761996512



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {
+		if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {

Review comment:
       after the second `if` we have some logic that should be done without checking SDK:
   ```
   ...
   if !ok {
     if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
       // code for the Go SDK only
     }
     // code for all SDK (including Go SDK after the top part)
     return
   }
   ...
   ```
   
   If I join these 2 `if`s it will be something like:
   ```
   if !ok && sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
     // code for the Go SDK only
     // code for all SDK (including Go SDK after the top part)
     return
   }
   if !ok {
     // code for all SDK (excluding Go SDK)
     return
   }
   ```
   
   So I think it's better to use present code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on pull request #16121:
URL: https://github.com/apache/beam/pull/16121#issuecomment-985483347


   R: @KhaninArtur @ilya-kozyrev


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762076146



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -250,6 +308,55 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
 	}
 }
 
+// readLogFile reads logs from the log file and keeps it to the cache.
+// If context is done it means that the code processing was finished (successfully/with error/timeout). Write last logs to the cache.
+// If <-stopReadLogsChannel it means that the code processing was finished (canceled/timeout)
+// 	and it waits until the method stops the work to change status to the pb.Status_STATUS_FINISHED. Write last logs

Review comment:
       Agree with Aydar, it provides better readability




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761996512



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {
+		if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {

Review comment:
       after the second if we have some logic that should be done without checking SDK:
   ```
   ...
   if !ok {
     if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
       // code for the Go SDK only
     }
     // code for all SDK (including Go SDK after the top part)
     return
   }
   ...
   ```
   
   If I join these 2 `if`s it will be something like:
   ```
   if !ok && sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
     // code for the Go SDK only
     // code for all SDK (including Go SDK after the top part)
     return
   }
   if !ok {
     // code for all SDK (including Go SDK after the top part)
     return
   }
   ```
   
   So I think it's better to use present code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761999207



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {
+		if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {

Review comment:
       Oh, I see, okay!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pavel-avilov commented on pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
pavel-avilov commented on pull request #16121:
URL: https://github.com/apache/beam/pull/16121#issuecomment-986640053


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761990473



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)

Review comment:
       In that case, `fallthrough` will be into `if/else` part but we can't use it that way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762074820



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -250,6 +308,55 @@ func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChannel chan b
 	}
 }
 
+// readLogFile reads logs from the log file and keeps it to the cache.
+// If context is done it means that the code processing was finished (successfully/with error/timeout). Write last logs to the cache.
+// If <-stopReadLogsChannel it means that the code processing was finished (canceled/timeout)
+// 	and it waits until the method stops the work to change status to the pb.Status_STATUS_FINISHED. Write last logs

Review comment:
       It is not necessary. Just wanted to separate it from other `If` cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r765813604



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {

Review comment:
       If extract this logic to a different function this function will have too many counts of variables. So I think no.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761999207



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {
+		if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {

Review comment:
       Oh, I see, okay! Let's leave it as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r763027811



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -261,69 +368,92 @@ func DeleteFolders(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
 }
 
 // finishByTimeout is used in case of runCode method finished by timeout
-func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
+func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) error {
 	logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
 
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
-	cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
-// processError processes error received during processing code via setting a corresponding status and output to cache
-func processError(ctx context.Context, err error, data []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
-	switch status {
-	case pb.Status_STATUS_VALIDATION_ERROR:
-		logger.Errorf("%s: Validate: %s\n", pipelineId, err.Error())
-
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_VALIDATION_ERROR)
-	case pb.Status_STATUS_PREPARATION_ERROR:
-		logger.Errorf("%s: Prepare: %s\n", pipelineId, err.Error())
+// processError processes error received during processing validation or preparation steps.
+// This method sets corresponding status to the cache.
+func processError(ctx context.Context, errorChannel chan error, pipelineId uuid.UUID, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
+	err := <-errorChannel
+	logger.Errorf("%s: %s(): %s\n", pipelineId, errorTitle, err.Error())
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_PREPARATION_ERROR)
-	case pb.Status_STATUS_COMPILE_ERROR:
-		logger.Errorf("%s: Compile: err: %s, output: %s\n", pipelineId, err.Error(), data)
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+}
 
-		cacheService.SetValue(ctx, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(data))
+// processCompileError processes error received during processing compile step.
+// This method sets error output and corresponding status to the cache.
+func processCompileError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
+	err := <-errorChannel
+	logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
-	case pb.Status_STATUS_RUN_ERROR:
-		logger.Errorf("%s: Run: err: %s, output: %s\n", pipelineId, err.Error(), data)
+	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(errorOutput)); err != nil {
+		return err
+	}
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
+}
 
-		cacheService.SetValue(ctx, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(data))
+// processRunError processes error received during processing run step.
+// This method sets error output to the cache and after that sets value to channel to stop goroutine which writes logs.
+//	After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
+//	sets corresponding status to the cache.
+func processRunError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+	err := <-errorChannel
+	logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
+	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(errorOutput)); err != nil {

Review comment:
       Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761968603



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {
+		if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {

Review comment:
       Let's join these 2 `if`s

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)

Review comment:
       If we replace `if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO` with switch case, we can use `fallthrough` command to execute the next case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ilya-kozyrev commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
ilya-kozyrev commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762899163



##########
File path: playground/backend/internal/setup_tools/builder/setup_builder_test.go
##########
@@ -30,6 +30,9 @@ func TestSetupExecutor(t *testing.T) {
 	pipelineId := uuid.New()
 	sdk := pb.Sdk_SDK_JAVA
 	lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, "")
+	if err != nil {
+		panic(err)

Review comment:
       could we use t.error instead? 
   ```suggestion
   		t.Error(err)
   ```

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+		} else {
+			// Use the log file to write all stdErr into it.
+			runCmdWithOutput(runCmd, &runOutput, file, successChannel, errorChannel)
+		}
+	} else {
+		// Other SDKs write logs to the log file on their own.
+		runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	}
+
+	ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
 	if err != nil {
 		return
 	}
+	if !ok {

Review comment:
       Could we extract this logic to a different function? 

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -261,69 +368,92 @@ func DeleteFolders(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
 }
 
 // finishByTimeout is used in case of runCode method finished by timeout
-func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
+func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) error {
 	logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
 
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
-	cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
 }
 
-// processError processes error received during processing code via setting a corresponding status and output to cache
-func processError(ctx context.Context, err error, data []byte, pipelineId uuid.UUID, cacheService cache.Cache, status pb.Status) {
-	switch status {
-	case pb.Status_STATUS_VALIDATION_ERROR:
-		logger.Errorf("%s: Validate: %s\n", pipelineId, err.Error())
-
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_VALIDATION_ERROR)
-	case pb.Status_STATUS_PREPARATION_ERROR:
-		logger.Errorf("%s: Prepare: %s\n", pipelineId, err.Error())
+// processError processes error received during processing validation or preparation steps.
+// This method sets corresponding status to the cache.
+func processError(ctx context.Context, errorChannel chan error, pipelineId uuid.UUID, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
+	err := <-errorChannel
+	logger.Errorf("%s: %s(): %s\n", pipelineId, errorTitle, err.Error())
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_PREPARATION_ERROR)
-	case pb.Status_STATUS_COMPILE_ERROR:
-		logger.Errorf("%s: Compile: err: %s, output: %s\n", pipelineId, err.Error(), data)
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, newStatus)
+}
 
-		cacheService.SetValue(ctx, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(data))
+// processCompileError processes error received during processing compile step.
+// This method sets error output and corresponding status to the cache.
+func processCompileError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
+	err := <-errorChannel
+	logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
-	case pb.Status_STATUS_RUN_ERROR:
-		logger.Errorf("%s: Run: err: %s, output: %s\n", pipelineId, err.Error(), data)
+	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.CompileOutput, "error: "+err.Error()+", output: "+string(errorOutput)); err != nil {
+		return err
+	}
+	return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_COMPILE_ERROR)
+}
 
-		cacheService.SetValue(ctx, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(data))
+// processRunError processes error received during processing run step.
+// This method sets error output to the cache and after that sets value to channel to stop goroutine which writes logs.
+//	After receiving a signal that goroutine was finished (read value from finishReadLogsChannel) this method
+//	sets corresponding status to the cache.
+func processRunError(ctx context.Context, errorChannel chan error, errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel, finishReadLogsChannel chan bool) error {
+	err := <-errorChannel
+	logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, err.Error(), errorOutput)
 
-		cacheService.SetValue(ctx, pipelineId, cache.Status, pb.Status_STATUS_RUN_ERROR)
+	if err := utils.SetToCache(ctx, cacheService, pipelineId, cache.RunError, "error: "+err.Error()+", output: "+string(errorOutput)); err != nil {

Review comment:
       Could we avoid string concatenation and use string formatting instead? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r763029960



##########
File path: playground/backend/internal/setup_tools/builder/setup_builder_test.go
##########
@@ -30,6 +30,9 @@ func TestSetupExecutor(t *testing.T) {
 	pipelineId := uuid.New()
 	sdk := pb.Sdk_SDK_JAVA
 	lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, "")
+	if err != nil {
+		panic(err)

Review comment:
       Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #16121:
URL: https://github.com/apache/beam/pull/16121#issuecomment-992528253


   this LGTM. can you resolve merge conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r761995360



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -91,42 +115,89 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 		var compileOutput bytes.Buffer
 		runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)
 
-		if err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
+		ok, err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel)
+		if err != nil {
+			return
+		}
+		if !ok {
+			_ = processCompileError(ctxWithTimeout, errorChannel, compileError.Bytes(), pipelineId, cacheService)
+			return
+		}
+		if err := processCompileSuccess(ctxWithTimeout, compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return
 		}
 	case pb.Sdk_SDK_PYTHON:
-		processSuccess(ctx, []byte(""), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+		if err := processCompileSuccess(ctxWithTimeout, []byte(""), pipelineId, cacheService); err != nil {
+			return
+		}
 	}
 
 	// Run
 	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_JAVA {
-		executor = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		executor, err = setJavaExecutableFile(lc, pipelineId, cacheService, ctxWithTimeout, executorBuilder, appEnv.WorkingDir())
+		if err != nil {
+			return
+		}
 	}
 	logger.Infof("%s: Run() ...\n", pipelineId)
 	runCmd := executor.Run(ctxWithTimeout)
 	var runError bytes.Buffer
 	runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
-	runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)
+	go readLogFile(ctxWithTimeout, cacheService, lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, finishReadLogsChannel)
 
-	err = processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
+	if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+		// For go SDK all logs are placed to stdErr.
+		file, err := os.Create(lc.GetAbsoluteLogFilePath())
+		if err != nil {
+			// If some error with creating a log file do the same as with other SDK.
+			logger.Errorf("%s: error during create log file (go sdk): %s", pipelineId, err.Error())
+			runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)

Review comment:
       Got it, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762840355



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -54,12 +60,14 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 	errorChannel := make(chan error, 1)
 	successChannel := make(chan bool, 1)
 	cancelChannel := make(chan bool, 1)
+	stopReadLogsChannel := make(chan bool, 1)
+	finishReadLogsChannel := make(chan bool, 1)
 
 	go cancelCheck(ctxWithTimeout, pipelineId, cancelChannel, cacheService)
 
 	executorBuilder, err := builder.SetupExecutorBuilder(lc.GetAbsoluteSourceFilePath(), lc.GetAbsoluteBaseFolderPath(), lc.GetAbsoluteExecutableFilePath(), sdkEnv)
 	if err != nil {
-		processSetupError(err, pipelineId, cacheService, ctxWithTimeout)
+		_ = processSetupError(err, pipelineId, cacheService, ctxWithTimeout)

Review comment:
       Wouldn't it be better to process error instead of ignoring it? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #16121: [BEAM-13334][Playground] Save Go logs to the cache

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r762998810



##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -54,12 +60,14 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 	errorChannel := make(chan error, 1)
 	successChannel := make(chan bool, 1)
 	cancelChannel := make(chan bool, 1)
+	stopReadLogsChannel := make(chan bool, 1)
+	finishReadLogsChannel := make(chan bool, 1)
 
 	go cancelCheck(ctxWithTimeout, pipelineId, cancelChannel, cacheService)
 
 	executorBuilder, err := builder.SetupExecutorBuilder(lc.GetAbsoluteSourceFilePath(), lc.GetAbsoluteBaseFolderPath(), lc.GetAbsoluteExecutableFilePath(), sdkEnv)
 	if err != nil {
-		processSetupError(err, pipelineId, cacheService, ctxWithTimeout)
+		_ = processSetupError(err, pipelineId, cacheService, ctxWithTimeout)

Review comment:
       `processSetupError()` could return an error only if some error happens during write status to cache:
   ```
   func processSetupError(...) error {
           logger.Errorf(...)
   	if err = utils.SetToCache(...); err != nil {
   		return err
   	}
   	return nil
   }
   ```
   All error processing is happening in the `utils.SetToCache` methods. So we do not need this error and we could use `_` here. 
   
   Moreover on the next line (71) this method `retrun`s.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org