You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/05 22:02:59 UTC

[GitHub] [beam] lostluck opened a new pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

lostluck opened a new pull request #15289:
URL: https://github.com/apache/beam/pull/15289


   Restoring https://github.com/apache/beam/pull/10942 to narrow down where the post submits failed previously.
   
   -------
   
   This adds PCollection metrics to the Go SDK, in particular, Element Count, and Sampled Size.
   
   New exec.PCollection nodes are added between every processing node in the bundle execution graph.
   * The new metrics are only added as MonitoringInfos, not the legacy protos.
   * There's about ~10ns added per element per PCollection node due to the atomic additions for every element.
   * Elements for sizes are selected randomly, then encoded to count their bytes (w/o window headers).
     * An initial index is selected form the first [0,1,2] at bundle start up, and then pre-select the next index from somewhere later on, proportional to the bundle so far.
     * As currently set up, it will take around 200-300 samples for the first 1M elements, so encoded overhead is limited
   * PCollections from a DataSource do 100% "sampling", since they're reading the bytes directly anyway. The PCollection node that would have been added after the DataSource is elided from the graph during construction, but re-used to avoid duplicating the logic for concurrently manipulating the size distribution.
     * DataSources can properly handle CoGBKs as well, counting non-header bytes for iterables, and state backed iterables.
     * This still involves a mutex Lock for every update, so we may want to find a lighter weight mechanism to handle the distribution samples from DataSources, or simply opt for the same random sampling.
     * A similar method could be used for DataSinks as well, but not handled in this PR. 
     * It's important to note that the runner is already aware of the number of bytes sent and received from the SDK side, so we may opt to remove that this entirely.
   * Counts and Samples are yet not made for SideInputs, which would better account for data consumed by DoFns.
   
   Thank you @ajamato for reminding me of the pre-select method for sampling, and @lukecwik for pointing out the DataSource can avoid separate additional encoding costs when measuring elements.
   
   Performance impact:
   I have two jobs I use for benchmarking this: Pipeline A uses int64s as elements and does simple passthroughs and sums, and Pipeline B where it's using large protocol buffers as elements, which spends a fair amount of CPU time decoding them.
   
   For small "fast" elements, the overhead is about ~19.5% of the Go side processing (which makes sense if elements are just being passed around or incremented).
   For large "heavy" elements, the overhead is about ~0.125% of the Go side of processing.
   
   Specifically, this is only taking into account the Go SDK worker, and not any runner side costs. This feels acceptable for the time being, though it's possible we can improve this later, especially for "lighter" jobs.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r685387873



##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -305,7 +313,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		data.Close()
 		state.Close()
 
-		mons, pylds := monitoring(plan)
+		mons, pylds := monitoring(plan, store)

Review comment:
       I tried only having the payloads, but Dataflow doesn't produce the metrics at all then, so we'll keep the monitoring infos around until Dataflow handles only payloads properly. 
   
   There might be an experiment to toggle to fix this, but there's no harm in the SDK waiting for the default to switch (other than the wire cost of not being exclusively on the short id requests & payload duplication).
   
   See https://console.cloud.google.com/dataflow/jobs/us-central1/2021-08-09_10_24_28-7240084760069835414?project=google.com:clouddfe which has both, vs  https://console.cloud.google.com/dataflow/jobs/us-central1/2021-08-09_10_00_52-11618126016479762400?project=google.com:clouddfe which is just the payloads




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-893907683


   R: @youngoli @jrmccluskey @riteshghorse 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r685391106



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pcollection.go
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"math/rand"
+	"sync"
+	"sync/atomic"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+// PCollection is a passthrough node to collect PCollection metrics, and
+// must be placed as the Out node of any producer of a PCollection.
+//
+// In particular, must not be placed after a Multiplex, and must be placed
+// after a Flatten.
+type PCollection struct {

Review comment:
       If you read the code in [ProcessElement](https://github.com/apache/beam/pull/15289/files/b3d4d1a90d0ccc36150a7ef50d497a0e793a3d9d#diff-a01d1e6315c7f0dc04c1148d3e17203dd25ff567b4f7c88a32b6e7cdc62e7920R83), we use the sampling technique I credited you for a year ago.
   
   I'm leaning towards that "always do the first 3 elements" thing, since I went with the random of first 3 three approach which fails on single element PCollections.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r685395050



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pcollection.go
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"math/rand"
+	"sync"
+	"sync/atomic"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+// PCollection is a passthrough node to collect PCollection metrics, and
+// must be placed as the Out node of any producer of a PCollection.
+//
+// In particular, must not be placed after a Multiplex, and must be placed
+// after a Flatten.
+type PCollection struct {

Review comment:
       IIRC, either I'm adding an object or duplicating all the code. Right now this was simplest, vs modifying each of the other node kinds.
   
   The main optimization I can do later is concretely make the Out nodes the PCollection type which will help the compiler inline the method calls avoiding the function call overhead where possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r685527558



##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -163,38 +162,65 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 	}.ExtractFrom(store)
 
 	// Get the execution monitoring information from the bundle plan.
-	if snapshot, ok := p.Progress(); ok {
-		payload, err := metricsx.Int64Counter(snapshot.Count)
+
+	snapshot, ok := p.Progress()
+	if !ok {
+		return monitoringInfo, payloads
+	}
+	for _, pcol := range snapshot.PCols {
+		payload, err := metricsx.Int64Counter(pcol.ElementCount)
 		if err != nil {
 			panic(err)
 		}
 
 		// TODO(BEAM-9934): This metric should account for elements in multiple windows.
-		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), metricsx.UrnElementCount)] = payload
+		payloads[getShortID(metrics.PCollectionLabels(pcol.ID), metricsx.UrnElementCount)] = payload
+
 		monitoringInfo = append(monitoringInfo,
 			&pipepb.MonitoringInfo{
 				Urn:  metricsx.UrnToString(metricsx.UrnElementCount),
 				Type: metricsx.UrnToType(metricsx.UrnElementCount),
 				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
+					"PCOLLECTION": pcol.ID,
 				},
 				Payload: payload,
 			})
 
-		payloads[getShortID(metrics.PTransformLabels(snapshot.ID), metricsx.UrnDataChannelReadIndex)] = payload
-		monitoringInfo = append(monitoringInfo,
-			&pipepb.MonitoringInfo{
-				Urn:  metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
-				Type: metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
-				Labels: map[string]string{
-					"PTRANSFORM": snapshot.ID,
-				},
-				Payload: payload,
-			})
+		// Skip pcollections without size
+		if pcol.SizeCount != 0 {
+			payload, err := metricsx.Int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax)
+			if err != nil {
+				panic(err)
+			}
+			monitoringInfo = append(monitoringInfo,
+				&pipepb.MonitoringInfo{
+					Urn:  "beam:metric:sampled_byte_size:v1",
+					Type: "beam:metrics:distribution_int_64",

Review comment:
       Good catch. Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-895551534


   R: @youngoli @jrmccluskey @riteshghorse


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r685648169



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasink.go
##########
@@ -38,7 +35,6 @@ type DataSink struct {
 	wEnc  WindowEncoder
 	w     io.WriteCloser
 	count int64

Review comment:
       Do you still need `count` here now that you got rid of the lines using it?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"context"
+	"math"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+)
+
+// TestPCollection verifies that the PCollection node works correctly.
+// Seed is by default set to 0, so we have a "deterministic" set of
+// randomness for the samples.
+func TestPCollection(t *testing.T) {
+	a := &CaptureNode{UID: 1}
+	pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()}
+	// The "large" 2nd value is to ensure the values are encoded properly,
+	// and that Min & Max are behaving.
+	inputs := []interface{}{int64(1), int64(2000000000), int64(3)}
+	in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol}
+
+	p, err := NewPlan("a", []Unit{a, pcol, in})
+	if err != nil {
+		t.Fatalf("failed to construct plan: %v", err)
+	}
+
+	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
+		t.Fatalf("execute failed: %v", err)
+	}
+	if err := p.Down(context.Background()); err != nil {
+		t.Fatalf("down failed: %v", err)
+	}
+
+	expected := makeValues(inputs...)
+	if !equalList(a.Elements, expected) {
+		t.Errorf("multiplex returned %v for a, want %v", extractValues(a.Elements...), extractValues(expected...))
+	}
+	snap := pcol.snapshot()
+	if want, got := int64(len(expected)), snap.ElementCount; got != want {
+		t.Errorf("snapshot miscounted: got %v, want %v", got, want)
+	}
+	checkPCollectionSizeSample(t, snap, 3, 7, 1, 5)
+}
+
+func TestPCollection_sizeReset(t *testing.T) {
+	// Check the initial values after resetting.
+	var pcol PCollection
+	pcol.resetSize()
+	snap := pcol.snapshot()
+	checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64)
+}
+
+func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count, sum, min, max int64) {
+	t.Helper()
+	if want, got := int64(count), snap.SizeCount; got != want {
+		t.Errorf("sample count incorrect: got %v, want %v", got, want)
+	}
+	if want, got := int64(sum), snap.SizeSum; got != want {
+		t.Errorf("sample sum incorrect: got %v, want %v", got, want)
+	}
+	if want, got := int64(min), snap.SizeMin; got != want {
+		t.Errorf("sample min incorrect: got %v, want %v", got, want)
+	}
+	if want, got := int64(max), snap.SizeMax; got != want {
+		t.Errorf("sample max incorrect: got %v, want %v", got, want)
+	}
+}
+
+// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan.
+//
+// On @lostluck's desktop (2020/02/20):
+// BenchmarkPCollection-12                 44699806                24.8 ns/op             0 B/op          0 allocs/op
+func BenchmarkPCollection(b *testing.B) {
+	// Pre allocate the capture buffer and process buffer to avoid
+	// unnecessary overhead.
+	out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
+	process := make([]MainInput, 0, b.N)
+	for i := 0; i < b.N; i++ {
+		process = append(process, MainInput{Key: FullValue{
+			Windows:   window.SingleGlobalWindow,
+			Timestamp: mtime.ZeroTimestamp,
+			Elm:       int64(1),
+		}})
+	}
+	pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()}
+	n := &FixedRoot{UID: 3, Elements: process, Out: pcol}
+	p, err := NewPlan("a", []Unit{n, pcol, out})
+	if err != nil {
+		b.Fatalf("failed to construct plan: %v", err)
+	}
+	b.ResetTimer()
+	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
+		b.Fatalf("execute failed: %v", err)
+	}
+	if err := p.Down(context.Background()); err != nil {
+		b.Fatalf("down failed: %v", err)
+	}
+	if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want {
+		b.Errorf("did not process all elements: got %v, want %v", got, want)
+	}
+	if got, want := len(out.Elements), b.N; got != want {
+		b.Errorf("did not process all elements: got %v, want %v", got, want)
+	}
+}
+
+// BenchmarkPCollection_Baseline measures the baseline of the node benchmarking scaffold.
+//
+// On @lostluck's desktop (2020/02/20):

Review comment:
       Nit: Looks like you forgot to update the date.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-893867957


   Run Go Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r685272069



##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -163,38 +162,65 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 	}.ExtractFrom(store)
 
 	// Get the execution monitoring information from the bundle plan.
-	if snapshot, ok := p.Progress(); ok {
-		payload, err := metricsx.Int64Counter(snapshot.Count)
+
+	snapshot, ok := p.Progress()
+	if !ok {
+		return monitoringInfo, payloads
+	}
+	for _, pcol := range snapshot.PCols {
+		payload, err := metricsx.Int64Counter(pcol.ElementCount)
 		if err != nil {
 			panic(err)
 		}
 
 		// TODO(BEAM-9934): This metric should account for elements in multiple windows.
-		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), metricsx.UrnElementCount)] = payload
+		payloads[getShortID(metrics.PCollectionLabels(pcol.ID), metricsx.UrnElementCount)] = payload
+
 		monitoringInfo = append(monitoringInfo,
 			&pipepb.MonitoringInfo{
 				Urn:  metricsx.UrnToString(metricsx.UrnElementCount),
 				Type: metricsx.UrnToType(metricsx.UrnElementCount),
 				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
+					"PCOLLECTION": pcol.ID,
 				},
 				Payload: payload,
 			})
 
-		payloads[getShortID(metrics.PTransformLabels(snapshot.ID), metricsx.UrnDataChannelReadIndex)] = payload
-		monitoringInfo = append(monitoringInfo,
-			&pipepb.MonitoringInfo{
-				Urn:  metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
-				Type: metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
-				Labels: map[string]string{
-					"PTRANSFORM": snapshot.ID,
-				},
-				Payload: payload,
-			})
+		// Skip pcollections without size
+		if pcol.SizeCount != 0 {
+			payload, err := metricsx.Int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax)
+			if err != nil {
+				panic(err)
+			}
+			monitoringInfo = append(monitoringInfo,
+				&pipepb.MonitoringInfo{
+					Urn:  "beam:metric:sampled_byte_size:v1",
+					Type: "beam:metrics:distribution_int_64",
+					Labels: map[string]string{
+						"PCOLLECTION": pcol.ID,

Review comment:
       That's what those IDs are. (They're taken from the bundle graph and populated in [exec/translate.go](https://github.com/apache/beam/pull/15289/files/b3d4d1a90d0ccc36150a7ef50d497a0e793a3d9d#diff-5f1959624a48fea7877e2dd71f648ea3fe8029b75a84c0ac6f5220888176464aR315) )
   
   Good to validate beyond "it works on Dataflow" :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-897236662


   Run Go Postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r687043611



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"context"
+	"math"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+)
+
+// TestPCollection verifies that the PCollection node works correctly.
+// Seed is by default set to 0, so we have a "deterministic" set of
+// randomness for the samples.
+func TestPCollection(t *testing.T) {
+	a := &CaptureNode{UID: 1}
+	pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()}
+	// The "large" 2nd value is to ensure the values are encoded properly,
+	// and that Min & Max are behaving.
+	inputs := []interface{}{int64(1), int64(2000000000), int64(3)}
+	in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol}
+
+	p, err := NewPlan("a", []Unit{a, pcol, in})
+	if err != nil {
+		t.Fatalf("failed to construct plan: %v", err)
+	}
+
+	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
+		t.Fatalf("execute failed: %v", err)
+	}
+	if err := p.Down(context.Background()); err != nil {
+		t.Fatalf("down failed: %v", err)
+	}
+
+	expected := makeValues(inputs...)
+	if !equalList(a.Elements, expected) {
+		t.Errorf("multiplex returned %v for a, want %v", extractValues(a.Elements...), extractValues(expected...))
+	}
+	snap := pcol.snapshot()
+	if want, got := int64(len(expected)), snap.ElementCount; got != want {
+		t.Errorf("snapshot miscounted: got %v, want %v", got, want)
+	}
+	checkPCollectionSizeSample(t, snap, 3, 7, 1, 5)
+}
+
+func TestPCollection_sizeReset(t *testing.T) {
+	// Check the initial values after resetting.
+	var pcol PCollection
+	pcol.resetSize()
+	snap := pcol.snapshot()
+	checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64)
+}
+
+func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count, sum, min, max int64) {
+	t.Helper()
+	if want, got := int64(count), snap.SizeCount; got != want {
+		t.Errorf("sample count incorrect: got %v, want %v", got, want)
+	}
+	if want, got := int64(sum), snap.SizeSum; got != want {
+		t.Errorf("sample sum incorrect: got %v, want %v", got, want)
+	}
+	if want, got := int64(min), snap.SizeMin; got != want {
+		t.Errorf("sample min incorrect: got %v, want %v", got, want)
+	}
+	if want, got := int64(max), snap.SizeMax; got != want {
+		t.Errorf("sample max incorrect: got %v, want %v", got, want)
+	}
+}
+
+// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan.
+//
+// On @lostluck's desktop (2020/02/20):
+// BenchmarkPCollection-12                 44699806                24.8 ns/op             0 B/op          0 allocs/op
+func BenchmarkPCollection(b *testing.B) {
+	// Pre allocate the capture buffer and process buffer to avoid
+	// unnecessary overhead.
+	out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
+	process := make([]MainInput, 0, b.N)
+	for i := 0; i < b.N; i++ {
+		process = append(process, MainInput{Key: FullValue{
+			Windows:   window.SingleGlobalWindow,
+			Timestamp: mtime.ZeroTimestamp,
+			Elm:       int64(1),
+		}})
+	}
+	pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()}
+	n := &FixedRoot{UID: 3, Elements: process, Out: pcol}
+	p, err := NewPlan("a", []Unit{n, pcol, out})
+	if err != nil {
+		b.Fatalf("failed to construct plan: %v", err)
+	}
+	b.ResetTimer()
+	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
+		b.Fatalf("execute failed: %v", err)
+	}
+	if err := p.Down(context.Background()); err != nil {
+		b.Fatalf("down failed: %v", err)
+	}
+	if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want {
+		b.Errorf("did not process all elements: got %v, want %v", got, want)
+	}
+	if got, want := len(out.Elements), b.N; got != want {
+		b.Errorf("did not process all elements: got %v, want %v", got, want)
+	}
+}
+
+// BenchmarkPCollection_Baseline measures the baseline of the node benchmarking scaffold.
+//
+// On @lostluck's desktop (2020/02/20):

Review comment:
       I didn't forget, I didn't re-run the benchmark so there wasn't anything to update. My laptop isn't currently the best benchmarking environment though, so I'm opting to avoid updating it for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-895556871






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-893868018


   Run Go Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-893867804






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-894371204


   @ajamato You got pinged because you were cc'd on the first version of this PR, which I linked at the top of the description. Thank you for pointing out some areas I can probably clean up and update some of the documentation due to this drift.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r687042489



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasink.go
##########
@@ -38,7 +35,6 @@ type DataSink struct {
 	wEnc  WindowEncoder
 	w     io.WriteCloser
 	count int64

Review comment:
       Ah! Good catch. Missed that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-897693463


   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15289:
URL: https://github.com/apache/beam/pull/15289


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on a change in pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #15289:
URL: https://github.com/apache/beam/pull/15289#discussion_r683867941



##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -163,38 +162,65 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 	}.ExtractFrom(store)
 
 	// Get the execution monitoring information from the bundle plan.
-	if snapshot, ok := p.Progress(); ok {
-		payload, err := metricsx.Int64Counter(snapshot.Count)
+
+	snapshot, ok := p.Progress()
+	if !ok {
+		return monitoringInfo, payloads
+	}
+	for _, pcol := range snapshot.PCols {
+		payload, err := metricsx.Int64Counter(pcol.ElementCount)
 		if err != nil {
 			panic(err)
 		}
 
 		// TODO(BEAM-9934): This metric should account for elements in multiple windows.
-		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), metricsx.UrnElementCount)] = payload
+		payloads[getShortID(metrics.PCollectionLabels(pcol.ID), metricsx.UrnElementCount)] = payload
+
 		monitoringInfo = append(monitoringInfo,
 			&pipepb.MonitoringInfo{
 				Urn:  metricsx.UrnToString(metricsx.UrnElementCount),
 				Type: metricsx.UrnToType(metricsx.UrnElementCount),
 				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
+					"PCOLLECTION": pcol.ID,
 				},
 				Payload: payload,
 			})
 
-		payloads[getShortID(metrics.PTransformLabels(snapshot.ID), metricsx.UrnDataChannelReadIndex)] = payload
-		monitoringInfo = append(monitoringInfo,
-			&pipepb.MonitoringInfo{
-				Urn:  metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
-				Type: metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
-				Labels: map[string]string{
-					"PTRANSFORM": snapshot.ID,
-				},
-				Payload: payload,
-			})
+		// Skip pcollections without size
+		if pcol.SizeCount != 0 {
+			payload, err := metricsx.Int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax)
+			if err != nil {
+				panic(err)
+			}
+			monitoringInfo = append(monitoringInfo,
+				&pipepb.MonitoringInfo{
+					Urn:  "beam:metric:sampled_byte_size:v1",
+					Type: "beam:metrics:distribution_int_64",
+					Labels: map[string]string{
+						"PCOLLECTION": pcol.ID,

Review comment:
       This must be the exact same string as the pcollection id passed into the ProcessBundleDescriptor
   
   https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/model/fn-execution/src/main/proto/beam_fn_api.proto#L198

##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -163,38 +162,65 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 	}.ExtractFrom(store)
 
 	// Get the execution monitoring information from the bundle plan.
-	if snapshot, ok := p.Progress(); ok {
-		payload, err := metricsx.Int64Counter(snapshot.Count)
+
+	snapshot, ok := p.Progress()
+	if !ok {
+		return monitoringInfo, payloads
+	}
+	for _, pcol := range snapshot.PCols {
+		payload, err := metricsx.Int64Counter(pcol.ElementCount)
 		if err != nil {
 			panic(err)
 		}
 
 		// TODO(BEAM-9934): This metric should account for elements in multiple windows.
-		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), metricsx.UrnElementCount)] = payload
+		payloads[getShortID(metrics.PCollectionLabels(pcol.ID), metricsx.UrnElementCount)] = payload
+
 		monitoringInfo = append(monitoringInfo,
 			&pipepb.MonitoringInfo{
 				Urn:  metricsx.UrnToString(metricsx.UrnElementCount),
 				Type: metricsx.UrnToType(metricsx.UrnElementCount),
 				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
+					"PCOLLECTION": pcol.ID,
 				},
 				Payload: payload,
 			})
 
-		payloads[getShortID(metrics.PTransformLabels(snapshot.ID), metricsx.UrnDataChannelReadIndex)] = payload
-		monitoringInfo = append(monitoringInfo,
-			&pipepb.MonitoringInfo{
-				Urn:  metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
-				Type: metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
-				Labels: map[string]string{
-					"PTRANSFORM": snapshot.ID,
-				},
-				Payload: payload,
-			})
+		// Skip pcollections without size
+		if pcol.SizeCount != 0 {
+			payload, err := metricsx.Int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax)
+			if err != nil {
+				panic(err)
+			}
+			monitoringInfo = append(monitoringInfo,
+				&pipepb.MonitoringInfo{
+					Urn:  "beam:metric:sampled_byte_size:v1",
+					Type: "beam:metrics:distribution_int_64",

Review comment:
       Maybe use the same pattern as above
   
   Urn:  metricsx.UrnToString(metricsx.UrnSampledByteSize),
   Type: metricsx.UrnToType(metricsx.UrnSampledByteSize),

##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -305,7 +313,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		data.Close()
 		state.Close()
 
-		mons, pylds := monitoring(plan)
+		mons, pylds := monitoring(plan, store)

Review comment:
       Just an FYI
   You only should populate ProcessBundleResponse's monitoring_data. And should NOT populate monitoring_infos which is deprecated.
   https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/model/fn-execution/src/main/proto/beam_fn_api.proto#L328
   
   You will need to implement the MonitoringInfoMetadataResposne instruction for the InstructionRequest/Response
   https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/model/fn-execution/src/main/proto/beam_fn_api.proto#L138
   On the MonitoringInfosMetadataResponse you will provide full MonitoringInfos.
   This is the optimization to prevent sending all that data on every ProcessBundle[Progress]Response
   
   
   
   
   (I'm a bit unclear on what's being done with short ids here. You may already be doing this correctly)

##########
File path: sdks/go/pkg/beam/core/runtime/exec/pcollection.go
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"math/rand"
+	"sync"
+	"sync/atomic"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+// PCollection is a passthrough node to collect PCollection metrics, and
+// must be placed as the Out node of any producer of a PCollection.
+//
+// In particular, must not be placed after a Multiplex, and must be placed
+// after a Flatten.
+type PCollection struct {

Review comment:
       FWIW here are the implementations in java and python
   https://github.com/apache/beam/pull/8416
   
   https://github.com/apache/beam/commit/d807210c3aa28f34c13b89f3f16bc104051532b0
   
   I dunno if you really need to introduce a PCollection abstraction for it. There might be simpler ways.
   
   I also will point out that it may not be wise to run the coder on every element. Instead run it on a random sample of elements only. Doing it on every element will degrade performance.
   
   I believe the algorithm in java was to record the first N elements. Then randomly record elements with smaller and smaller frequency based on the number of elements which were already sampled.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -163,38 +162,65 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 	}.ExtractFrom(store)
 
 	// Get the execution monitoring information from the bundle plan.
-	if snapshot, ok := p.Progress(); ok {
-		payload, err := metricsx.Int64Counter(snapshot.Count)
+
+	snapshot, ok := p.Progress()
+	if !ok {
+		return monitoringInfo, payloads
+	}
+	for _, pcol := range snapshot.PCols {
+		payload, err := metricsx.Int64Counter(pcol.ElementCount)
 		if err != nil {
 			panic(err)
 		}
 
 		// TODO(BEAM-9934): This metric should account for elements in multiple windows.
-		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), metricsx.UrnElementCount)] = payload
+		payloads[getShortID(metrics.PCollectionLabels(pcol.ID), metricsx.UrnElementCount)] = payload
+
 		monitoringInfo = append(monitoringInfo,
 			&pipepb.MonitoringInfo{
 				Urn:  metricsx.UrnToString(metricsx.UrnElementCount),
 				Type: metricsx.UrnToType(metricsx.UrnElementCount),
 				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
+					"PCOLLECTION": pcol.ID,
 				},
 				Payload: payload,
 			})
 
-		payloads[getShortID(metrics.PTransformLabels(snapshot.ID), metricsx.UrnDataChannelReadIndex)] = payload
-		monitoringInfo = append(monitoringInfo,
-			&pipepb.MonitoringInfo{
-				Urn:  metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
-				Type: metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
-				Labels: map[string]string{
-					"PTRANSFORM": snapshot.ID,
-				},
-				Payload: payload,
-			})
+		// Skip pcollections without size
+		if pcol.SizeCount != 0 {
+			payload, err := metricsx.Int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax)
+			if err != nil {
+				panic(err)
+			}
+			monitoringInfo = append(monitoringInfo,
+				&pipepb.MonitoringInfo{
+					Urn:  "beam:metric:sampled_byte_size:v1",
+					Type: "beam:metrics:distribution_int_64",
+					Labels: map[string]string{
+						"PCOLLECTION": pcol.ID,

Review comment:
       Otherwise looks like it matches the spec :)
   
   https://github.com/apache/beam/blob/4a78a81f1e9f2f9f73eda34c9eb5651eb9dad885/model/pipeline/src/main/proto/metrics.proto#L216




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15289: [BEAM-6374] Emit PCollection metrics from GoSDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15289:
URL: https://github.com/apache/beam/pull/15289#issuecomment-893969502


   @ajamato i must have accidentally clicked the suggested reviewer button, but I didn't tag you for review. 
   Much appreciated though, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org