You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/26 16:04:27 UTC

[GitHub] [beam] AydarZaynutdinov opened a new pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

AydarZaynutdinov opened a new pull request #15804:
URL: https://github.com/apache/beam/pull/15804


   add `setupCache()` method;
   implement `RunCode` API method with context;
   change `ExecutableName` to a variable of `LifeCycle` instead of method;
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #15804:
URL: https://github.com/apache/beam/pull/15804


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r746538754



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -66,7 +66,7 @@ func (controller *playgroundController) RunCode(ctx context.Context, info *pb.Ru
 		return nil, errors.InternalError("Run code()", "Error during set expiration to cache: "+err.Error())
 	}
 
-	go processCode(ctx, controller.cacheService, lc, compileBuilder, pipelineId, controller.env, info.Sdk)
+	go processCode(context.TODO(), controller.cacheService, lc, compileBuilder, pipelineId, controller.env, info.Sdk)

Review comment:
       Yes, it will be changed later. Most likely `context.Background()` will be used instead of it.
   I added a TODO string + JIRA ticket for this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r745041551



##########
File path: playground/api/v1/api.proto
##########
@@ -33,12 +33,14 @@ enum Status {
   STATUS_UNSPECIFIED = 0;
   STATUS_VALIDATING = 1;
   STATUS_VALIDATION_ERROR = 2;
-  STATUS_COMPILING = 3;
-  STATUS_COMPILE_ERROR = 4;
-  STATUS_EXECUTING = 5;
-  STATUS_FINISHED = 6;
-  STATUS_ERROR = 7;
-  STATUS_RUN_TIMEOUT = 8;
+  STATUS_PREPARING = 3;
+  STATUS_PREPARATION_ERROR = 4;
+  STATUS_COMPILING = 5;
+  STATUS_COMPILE_ERROR = 6;
+  STATUS_EXECUTING = 7;
+  STATUS_FINISHED = 8;
+  STATUS_ERROR = 9;

Review comment:
       this error knid is specifically for runtime erors right? should we name it `STATUS_RUN_ERROR` or `STATUS_RUNTIME_ERROR` or something like that?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)

Review comment:
       should we call it `successChannel`? Since it seems that it is used to declare success or failures

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)
 
 	// build executor for validate and compile steps
 	exec := compileBuilder.Build()
 
 	// validate
 	logger.Infof("%s: Validate() ...\n", pipelineId)
 	validateFunc := exec.Validate()
-	if err := validateFunc(); err != nil {
-		processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+	go validateFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
+		return
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+			return
+		}
+		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARING)
+	}
+
+	// prepare
+	logger.Info("%s: Prepare() ...\n", pipelineId)
+	prepareFunc := exec.Prepare()
+	go prepareFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARATION_ERROR)
+			return
+		}
 		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_COMPILING)
 	}
 
 	// compile
 	logger.Infof("%s: Compile() ...\n", pipelineId)
-	compileCmd := exec.Compile()
-	if data, err := compileCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+	compileCmd := exec.Compile(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := compileCmd.CombinedOutput()

Review comment:
       Can we add a TODO+JIRA issue here here to separate stderr from stdout? stderr will end up showing all sorts of logs, and stdout may show user-printed output which they may like to see separated at some point : )

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)
 
 	// build executor for validate and compile steps
 	exec := compileBuilder.Build()
 
 	// validate
 	logger.Infof("%s: Validate() ...\n", pipelineId)
 	validateFunc := exec.Validate()
-	if err := validateFunc(); err != nil {
-		processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+	go validateFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
+		return
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+			return
+		}
+		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARING)
+	}
+
+	// prepare
+	logger.Info("%s: Prepare() ...\n", pipelineId)
+	prepareFunc := exec.Prepare()
+	go prepareFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARATION_ERROR)
+			return
+		}
 		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_COMPILING)
 	}
 
 	// compile
 	logger.Infof("%s: Compile() ...\n", pipelineId)
-	compileCmd := exec.Compile()
-	if data, err := compileCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+	compileCmd := exec.Compile(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := compileCmd.CombinedOutput()
+		dataCh <- data
+		if err != nil {
+			errCh <- err
+			doneCh <- false
+		} else {
+			doneCh <- true
+		}
+	}(doneChannel, errorChannel, dataChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
-		processSuccess(ctx, data, pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+	case ok := <-doneChannel:
+		data := <-dataChannel
+		if !ok {
+			err := <-errorChannel
+			processError(ctxWithTimeout, err, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+			return
+		}
+		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
 	}
 
 	runBuilder, err := setupRunBuilder(pipelineId, lc, sdk, env, compileBuilder)
 	if err != nil {
 		logger.Errorf("%s: error during setup runBuilder: %s\n", pipelineId, err.Error())
-		setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
+		setToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
 		return
 	}
 
 	// build executor for run step
 	exec = runBuilder.Build()
 
+	// run
 	logger.Infof("%s: Run() ...\n", pipelineId)
-	runCmd := exec.Run()
-	if data, err := runCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_ERROR)
+	runCmd := exec.Run(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := runCmd.CombinedOutput()
+		dataCh <- data
+		if err != nil {
+			errCh <- err
+			doneCh <- false
+		} else {
+			doneCh <- true
+		}
+	}(doneChannel, errorChannel, dataChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
-		processSuccess(ctx, data, pipelineId, cacheService, pb.Status_STATUS_FINISHED)
+	case ok := <-doneChannel:
+		data := <-dataChannel
+		if !ok {
+			err := <-errorChannel
+			processError(ctxWithTimeout, err.(error), data.([]byte), pipelineId, cacheService, pb.Status_STATUS_ERROR)
+			return
+		}
+		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_FINISHED)
 	}
 }
 
+// finishByContext is used in case of runCode method finished by timeout
+func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
+	logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)

Review comment:
       can we make sure the log shows where the timeout happened? compile / prepare / verify / run?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -66,7 +66,7 @@ func (controller *playgroundController) RunCode(ctx context.Context, info *pb.Ru
 		return nil, errors.InternalError("Run code()", "Error during set expiration to cache: "+err.Error())
 	}
 
-	go processCode(ctx, controller.cacheService, lc, compileBuilder, pipelineId, controller.env, info.Sdk)
+	go processCode(context.TODO(), controller.cacheService, lc, compileBuilder, pipelineId, controller.env, info.Sdk)

Review comment:
       what's the long-term plan for this context? Will we use the background context or something like that?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -66,7 +66,7 @@ func (controller *playgroundController) RunCode(ctx context.Context, info *pb.Ru
 		return nil, errors.InternalError("Run code()", "Error during set expiration to cache: "+err.Error())
 	}
 
-	go processCode(ctx, controller.cacheService, lc, compileBuilder, pipelineId, controller.env, info.Sdk)
+	go processCode(context.TODO(), controller.cacheService, lc, compileBuilder, pipelineId, controller.env, info.Sdk)

Review comment:
       no need to update this for now, but just for later

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)
 
 	// build executor for validate and compile steps
 	exec := compileBuilder.Build()
 
 	// validate
 	logger.Infof("%s: Validate() ...\n", pipelineId)
 	validateFunc := exec.Validate()
-	if err := validateFunc(); err != nil {
-		processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+	go validateFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
+		return
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+			return
+		}
+		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARING)
+	}
+
+	// prepare
+	logger.Info("%s: Prepare() ...\n", pipelineId)
+	prepareFunc := exec.Prepare()
+	go prepareFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARATION_ERROR)
+			return
+		}
 		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_COMPILING)
 	}
 
 	// compile
 	logger.Infof("%s: Compile() ...\n", pipelineId)
-	compileCmd := exec.Compile()
-	if data, err := compileCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+	compileCmd := exec.Compile(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := compileCmd.CombinedOutput()
+		dataCh <- data
+		if err != nil {
+			errCh <- err
+			doneCh <- false
+		} else {
+			doneCh <- true
+		}
+	}(doneChannel, errorChannel, dataChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
-		processSuccess(ctx, data, pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+	case ok := <-doneChannel:
+		data := <-dataChannel
+		if !ok {
+			err := <-errorChannel
+			processError(ctxWithTimeout, err, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+			return
+		}
+		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
 	}
 
 	runBuilder, err := setupRunBuilder(pipelineId, lc, sdk, env, compileBuilder)
 	if err != nil {
 		logger.Errorf("%s: error during setup runBuilder: %s\n", pipelineId, err.Error())
-		setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
+		setToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
 		return
 	}
 
 	// build executor for run step
 	exec = runBuilder.Build()
 
+	// run
 	logger.Infof("%s: Run() ...\n", pipelineId)
-	runCmd := exec.Run()
-	if data, err := runCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_ERROR)
+	runCmd := exec.Run(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := runCmd.CombinedOutput()
+		dataCh <- data
+		if err != nil {
+			errCh <- err
+			doneCh <- false
+		} else {
+			doneCh <- true
+		}
+	}(doneChannel, errorChannel, dataChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
-		processSuccess(ctx, data, pipelineId, cacheService, pb.Status_STATUS_FINISHED)
+	case ok := <-doneChannel:
+		data := <-dataChannel
+		if !ok {
+			err := <-errorChannel
+			processError(ctxWithTimeout, err.(error), data.([]byte), pipelineId, cacheService, pb.Status_STATUS_ERROR)
+			return
+		}
+		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_FINISHED)
 	}
 }
 
+// finishByContext is used in case of runCode method finished by timeout
+func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
+	logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)

Review comment:
       maybe even print the whole context




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r746537654



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)
 
 	// build executor for validate and compile steps
 	exec := compileBuilder.Build()
 
 	// validate
 	logger.Infof("%s: Validate() ...\n", pipelineId)
 	validateFunc := exec.Validate()
-	if err := validateFunc(); err != nil {
-		processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+	go validateFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
+		return
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+			return
+		}
+		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARING)
+	}
+
+	// prepare
+	logger.Info("%s: Prepare() ...\n", pipelineId)
+	prepareFunc := exec.Prepare()
+	go prepareFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARATION_ERROR)
+			return
+		}
 		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_COMPILING)
 	}
 
 	// compile
 	logger.Infof("%s: Compile() ...\n", pipelineId)
-	compileCmd := exec.Compile()
-	if data, err := compileCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+	compileCmd := exec.Compile(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := compileCmd.CombinedOutput()
+		dataCh <- data
+		if err != nil {
+			errCh <- err
+			doneCh <- false
+		} else {
+			doneCh <- true
+		}
+	}(doneChannel, errorChannel, dataChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
-		processSuccess(ctx, data, pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
+	case ok := <-doneChannel:
+		data := <-dataChannel
+		if !ok {
+			err := <-errorChannel
+			processError(ctxWithTimeout, err, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+			return
+		}
+		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_EXECUTING)
 	}
 
 	runBuilder, err := setupRunBuilder(pipelineId, lc, sdk, env, compileBuilder)
 	if err != nil {
 		logger.Errorf("%s: error during setup runBuilder: %s\n", pipelineId, err.Error())
-		setToCache(ctx, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
+		setToCache(ctxWithTimeout, cacheService, pipelineId, cache.Status, pb.Status_STATUS_ERROR)
 		return
 	}
 
 	// build executor for run step
 	exec = runBuilder.Build()
 
+	// run
 	logger.Infof("%s: Run() ...\n", pipelineId)
-	runCmd := exec.Run()
-	if data, err := runCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_ERROR)
+	runCmd := exec.Run(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := runCmd.CombinedOutput()
+		dataCh <- data
+		if err != nil {
+			errCh <- err
+			doneCh <- false
+		} else {
+			doneCh <- true
+		}
+	}(doneChannel, errorChannel, dataChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
-		processSuccess(ctx, data, pipelineId, cacheService, pb.Status_STATUS_FINISHED)
+	case ok := <-doneChannel:
+		data := <-dataChannel
+		if !ok {
+			err := <-errorChannel
+			processError(ctxWithTimeout, err.(error), data.([]byte), pipelineId, cacheService, pb.Status_STATUS_ERROR)
+			return
+		}
+		processSuccess(ctxWithTimeout, data.([]byte), pipelineId, cacheService, pb.Status_STATUS_FINISHED)
 	}
 }
 
+// finishByContext is used in case of runCode method finished by timeout
+func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache) {
+	logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)

Review comment:
       Now before each step (validate/prepare/…) we have a log string like:
   `logger.Infof("%s: Validate() ...\n", pipelineId)` in the `processCode()` method.
   At the end of each step we also have a log string like:
   `logger.Infof("%s: Validate() finish\n", pipelineId)` in the `processSuccess()` method.
   For example:
   ```
   ...
   {pipelineId}: Validate() ...
   {pipelineId}: Validate() finish
   {pipelineId}: Prepare() ...
   {pipelineId}: processCode finish because of timeout
   ...
   ```
   So I guess using these logs we could understand which step timeout has occurred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r746539136



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)
 
 	// build executor for validate and compile steps
 	exec := compileBuilder.Build()
 
 	// validate
 	logger.Infof("%s: Validate() ...\n", pipelineId)
 	validateFunc := exec.Validate()
-	if err := validateFunc(); err != nil {
-		processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+	go validateFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
+		return
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_VALIDATION_ERROR)
+			return
+		}
+		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARING)
+	}
+
+	// prepare
+	logger.Info("%s: Prepare() ...\n", pipelineId)
+	prepareFunc := exec.Prepare()
+	go prepareFunc(doneChannel, errorChannel)
+
+	select {
+	case <-ctxWithTimeout.Done():
+		finishByContext(ctxWithTimeout, pipelineId, cacheService)
 		return
-	} else {
+	case ok := <-doneChannel:
+		if !ok {
+			err := <-errorChannel
+			processError(ctx, err, nil, pipelineId, cacheService, pb.Status_STATUS_PREPARATION_ERROR)
+			return
+		}
 		processSuccess(ctx, nil, pipelineId, cacheService, pb.Status_STATUS_COMPILING)
 	}
 
 	// compile
 	logger.Infof("%s: Compile() ...\n", pipelineId)
-	compileCmd := exec.Compile()
-	if data, err := compileCmd.CombinedOutput(); err != nil {
-		processError(ctx, err, data, pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+	compileCmd := exec.Compile(ctxWithTimeout)
+	go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+		data, err := compileCmd.CombinedOutput()

Review comment:
       That’s a good idea. I added a TODO string + JIRA ticket for this one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r745187941



##########
File path: playground/api/v1/api.proto
##########
@@ -33,12 +33,14 @@ enum Status {
   STATUS_UNSPECIFIED = 0;
   STATUS_VALIDATING = 1;
   STATUS_VALIDATION_ERROR = 2;
-  STATUS_COMPILING = 3;
-  STATUS_COMPILE_ERROR = 4;
-  STATUS_EXECUTING = 5;
-  STATUS_FINISHED = 6;
-  STATUS_ERROR = 7;
-  STATUS_RUN_TIMEOUT = 8;
+  STATUS_PREPARING = 3;
+  STATUS_PREPARATION_ERROR = 4;
+  STATUS_COMPILING = 5;
+  STATUS_COMPILE_ERROR = 6;
+  STATUS_EXECUTING = 7;
+  STATUS_FINISHED = 8;
+  STATUS_ERROR = 9;

Review comment:
       I see that this was changed in a different PR, so LGTM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on a change in pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r745467640



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) *[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR as cache.Status into cache.
 // - In case of compile step is failed saves playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string ("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves playground.Status_STATUS_FINISHED as cache.Status and run output as cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-	defer cleanUp(pipelineId, lc)
+	ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, env.ApplicationEnvs.PipelineExecuteTimeout())
+	defer func(lc *fs_tool.LifeCycle) {
+		cancelByTimeoutFunc()
+		cleanUp(pipelineId, lc)
+	}(lc)
+
+	errorChannel := make(chan error, 1)
+	dataChannel := make(chan interface{}, 1)
+	doneChannel := make(chan bool, 1)

Review comment:
       Renamed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AydarZaynutdinov commented on pull request #15804: [BEAM-13109][Playground] Add processing of timeout for RunCode API method

Posted by GitBox <gi...@apache.org>.
AydarZaynutdinov commented on pull request #15804:
URL: https://github.com/apache/beam/pull/15804#issuecomment-953995945


   R: @pabloem
   R: @damondouglas


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org