You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/20 20:30:16 UTC

[GitHub] [beam] lukecwik opened a new pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

lukecwik opened a new pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184
 
 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398795863
 
 

 ##########
 File path: runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
 ##########
 @@ -28,30 +31,17 @@
    * Matches a {@link MonitoringInfo} with that has the set fields in the provide MonitoringInfo.
    *
    * <p>This is useful for tests which do not want to match the specific value (execution times).
-   * Currently this will only check for URNs, labels, type URNs and int64Values.
+   * Currently this will only check for URNs, labels, type URNs and payloads.
    */
   public static TypeSafeMatcher<MonitoringInfo> matchSetFields(final MonitoringInfo mi) {
     return new TypeSafeMatcher<MonitoringInfo>() {
 
       @Override
       protected boolean matchesSafely(MonitoringInfo item) {
-        if (!item.getUrn().equals(mi.getUrn())) {
-          return false;
-        }
-        if (!item.getLabels().equals(mi.getLabels())) {
-          return false;
-        }
-        if (!item.getType().equals(mi.getType())) {
-          return false;
-        }
-
-        if (mi.getMetric().hasCounterData()) {
-          long valueToMatch = mi.getMetric().getCounterData().getInt64Value();
-          if (valueToMatch != item.getMetric().getCounterData().getInt64Value()) {
-            return false;
-          }
-        }
-        return true;
+        return (mi.getUrn().isEmpty() || mi.getUrn().equals(item.getUrn()))
 
 Review comment:
   Why did you add the isEmpty() checks, this looks like the old matcher didn't have this logic, and it would match if you initialized the matcher with an empty monitoring info

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398798580
 
 

 ##########
 File path: runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
 ##########
 @@ -76,17 +63,20 @@ public void describeTo(Description description) {
    * Matches a {@link MonitoringInfo} with that has the set fields in the provide MonitoringInfo.
    *
    * <p>This is useful for tests which do not want to match the specific value (execution times).
-   * Currently this will only check for URNs, labels, type URNs and int64Values.
+   * Currently this will only check for URNs, labels, type URNs and {@code beam:coder:varint:v1}
+   * encoded values.
    */
-  public static TypeSafeMatcher<MonitoringInfo> valueGreaterThan(final long value) {
+  public static TypeSafeMatcher<MonitoringInfo> valueGreaterThanOrEqualTo(final long value) {
 
 Review comment:
   would this only work with certain types? Should this be renamed?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398942909
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   Made all the URNs unique and added a test to make sure that they remain unique.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398856669
 
 

 ##########
 File path: runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
 ##########
 @@ -76,17 +63,20 @@ public void describeTo(Description description) {
    * Matches a {@link MonitoringInfo} with that has the set fields in the provide MonitoringInfo.
    *
    * <p>This is useful for tests which do not want to match the specific value (execution times).
-   * Currently this will only check for URNs, labels, type URNs and int64Values.
+   * Currently this will only check for URNs, labels, type URNs and {@code beam:coder:varint:v1}
+   * encoded values.
    */
-  public static TypeSafeMatcher<MonitoringInfo> valueGreaterThan(final long value) {
+  public static TypeSafeMatcher<MonitoringInfo> valueGreaterThanOrEqualTo(final long value) {
 
 Review comment:
   Renamed to `counterValueGreaterThanOrEqualTo`
   
   Also fixed comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395893496
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_int64:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_int64:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:top_n_int64:v1"];
+
+    // Represents the largest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:top_n_int64:v1"];
+
+    // Represents the smallest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                          "beam:metrics:bottom_n_int64:v1"];
+
+    // Represents the smallest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:bottom_n_int64:v1"];
+
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   I think progress should be a MonitoringInfoSpec URN and it should use either SUM_DOUBLE_TYPE or X_DOUBLE_TYPE.
   
   It should be something which defscribes what each update is supposed to be, and how they should be aggregated together. I think that's required for each type, in order to all the use case of "worker just forwards these to a service to aggregate" As it may need to go pre aggregation of its own?
   
   GAUGE_DOUBLE_TYPE? I am not sure what's appropriate here. But it probably doesn't make sense for progress to be its own type.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395888786
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -194,33 +188,25 @@ extend google.protobuf.EnumValueOptions {
 }
 
 message MonitoringInfo {
-  // The name defining the metric or monitored state.
+  // The name defining the semantic meaning of the metric or monitored state.
+  //
+  // See MonitoringInfoSpecs.Enum for the set of well known metrics/monitored
+  // state.
   string urn = 1;
 
-  // This is specified as a URN that implies:
-  // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
-  // Sub types like field formats - int64, double, string.
-  // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
-  // valid values are:
-  // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
-  //     sum_double|latest_double|top_n_double|bottom_n_double|
-  //     distribution_int_64|distribution_double|monitoring_data_table|
-  //     latest_doubles
+  // This is specified as a URN that implies the encoding and aggregation
+  // method. See MonitoringInfoTypeUrns.Enum for the set of well known types.
   string type = 2;
 
-  // The Metric or monitored state.
-  oneof data {
-    MonitoringTableData monitoring_table_data = 3;
-    Metric metric = 4;
-    bytes payload = 7;
-  }
+  // The monitored state encoded as per the specification defined by the type.
+  bytes payload = 3;
 
 Review comment:
   Probably you are already planning on doing this. But having helper functions to easily encode/decode these would be great.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604196167
 
 
   Run PythonLint PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395900505
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_int64:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_int64:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:top_n_int64:v1"];
+
+    // Represents the largest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:top_n_int64:v1"];
+
+    // Represents the smallest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                          "beam:metrics:bottom_n_int64:v1"];
+
+    // Represents the smallest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:bottom_n_int64:v1"];
+
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                       "beam:metrics:progress:v1"];
+
+    // General MonitoredState information which contains
+    // structured information which does not fit into a typical
+    // metric format. See MonitoringTableData for more details.
+    //
+    // Encoding: MonitoringTableData proto encoded as bytes
+    MONITORING_TABLE_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   I'll see what I can do when I'm mucking around with this in the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398273798
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   I enumerate them so that its easier to write validation logic in the future that uses the monitoring specs.
   
   Also I don't believe user counters can have any type as the progress type doesn't make much sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604780915
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398794269
 
 

 ##########
 File path: runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
 ##########
 @@ -86,46 +89,97 @@ public SimpleMonitoringInfoBuilder setUrn(String urn) {
     return this;
   }
 
-  /** Sets the timestamp of the MonitoringInfo to the current time. */
-  public SimpleMonitoringInfoBuilder setTimestampToNow() {
-    Instant time = Instant.now();
-    this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano());
+  /**
+   * Sets the type of the MonitoringInfo.
+   *
+   * @param type The type of the MonitoringInfo
+   */
+  public SimpleMonitoringInfoBuilder setType(String type) {
+    this.builder.setType(type);
+    return this;
+  }
+
+  /**
+   * Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setInt64SumValue(long value) {
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      VARINT_CODER.encode(value, output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE);
     return this;
   }
 
-  /** Sets the int64Value of the CounterData in the MonitoringInfo, and the appropriate type URN. */
-  public SimpleMonitoringInfoBuilder setInt64Value(long value) {
-    this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value);
-    this.setInt64TypeUrn();
+  public SimpleMonitoringInfoBuilder setDoubleSumValue(double value) {
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      DOUBLE_CODER.encode(value, output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_DOUBLE_TYPE);
     return this;
   }
 
   /**
-   * Sets the IntDistributionData of the DistributionData in the MonitoringInfo, and the appropriate
-   * type URN.
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#LATEST_INT64_TYPE}.
    */
-  public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
-    this.builder
-        .getMetricBuilder()
-        .getDistributionDataBuilder()
-        .getIntDistributionDataBuilder()
-        .setCount(data.count())
-        .setSum(data.sum())
-        .setMin(data.min())
-        .setMax(data.max());
-    this.setInt64DistributionTypeUrn();
+  public SimpleMonitoringInfoBuilder setInt64LatestValue(GaugeData data) {
+    checkArgument(GaugeData.empty() != data, "Cannot encode empty gauge data");
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      VARINT_CODER.encode(data.timestamp().getMillis(), output);
+      VARINT_CODER.encode(data.value(), output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE);
     return this;
   }
 
-  /** Sets the the appropriate type URN for int64 distribution tuples. */
-  public SimpleMonitoringInfoBuilder setInt64DistributionTypeUrn() {
-    this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64);
+  /**
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      VARINT_CODER.encode(data.count(), output);
+      VARINT_CODER.encode(data.sum(), output);
+      VARINT_CODER.encode(data.min(), output);
+      VARINT_CODER.encode(data.max(), output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE);
     return this;
   }
 
-  /** Sets the the appropriate type URN for sum int64 counters. */
-  public SimpleMonitoringInfoBuilder setInt64TypeUrn() {
-    this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64);
+  /**
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setDoubleDistributionValue(
 
 Review comment:
   Looks like SimpleMonitoringInfoBuilder helped a lot with this refactor :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398239033
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,40 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+				monitoringInfo = append(monitoringInfo,
+					&ppb.MonitoringInfo{
+						Urn:     "beam:metric:user:v1",
+						Type:    "beam:metrics:sum_int64:v1",
+						Labels:  userLabels(l),
+						Payload: payload,
+					})
+			}
 		},
 		DistributionInt64: func(l metrics.Labels, count, sum, min, max int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user_distribution",
-					Type:   "beam:metrics:distribution_int_64",
-					Labels: userLabels(l),
-					Data:   int64Distribution(count, sum, min, max),
-				})
+			payload, err := int64Distribution(count, sum, min, max)
+			if err != nil {
+				monitoringInfo = append(monitoringInfo,
+					&ppb.MonitoringInfo{
+						Urn:     "beam:metric:user:v1",
+						Type:    "beam:metrics:distribution_int64:v1",
+						Labels:  userLabels(l),
+						Payload: payload,
+					})
+			}
 		},
 		GaugeInt64: func(l metrics.Labels, v int64, t time.Time) {
-			ts, err := ptypes.TimestampProto(t)
+			payload, err := int64Latest(t, v)
 			if err != nil {
 
 Review comment:
   Error case reversal. 
   Consider 
   ```
   if err != nil {
     panic(err)
   }
   // ...actual contents of the block. 
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395936249
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   Should it be legal to have two counters with the same URN but different types. (This seems to fly agains the idea of a URN being a Unique identifier.) 
   
   Seeing this explosion of types, however, makes it feel like we should not be manually be enumerating them (or at least I'm struggling to see the value in that over just saying that user counters may have any type).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398852450
 
 

 ##########
 File path: runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
 ##########
 @@ -86,46 +89,97 @@ public SimpleMonitoringInfoBuilder setUrn(String urn) {
     return this;
   }
 
-  /** Sets the timestamp of the MonitoringInfo to the current time. */
-  public SimpleMonitoringInfoBuilder setTimestampToNow() {
-    Instant time = Instant.now();
-    this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano());
+  /**
+   * Sets the type of the MonitoringInfo.
+   *
+   * @param type The type of the MonitoringInfo
+   */
+  public SimpleMonitoringInfoBuilder setType(String type) {
+    this.builder.setType(type);
+    return this;
+  }
+
+  /**
+   * Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setInt64SumValue(long value) {
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      VARINT_CODER.encode(value, output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE);
     return this;
   }
 
-  /** Sets the int64Value of the CounterData in the MonitoringInfo, and the appropriate type URN. */
-  public SimpleMonitoringInfoBuilder setInt64Value(long value) {
-    this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value);
-    this.setInt64TypeUrn();
+  public SimpleMonitoringInfoBuilder setDoubleSumValue(double value) {
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      DOUBLE_CODER.encode(value, output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_DOUBLE_TYPE);
     return this;
   }
 
   /**
-   * Sets the IntDistributionData of the DistributionData in the MonitoringInfo, and the appropriate
-   * type URN.
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#LATEST_INT64_TYPE}.
    */
-  public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
-    this.builder
-        .getMetricBuilder()
-        .getDistributionDataBuilder()
-        .getIntDistributionDataBuilder()
-        .setCount(data.count())
-        .setSum(data.sum())
-        .setMin(data.min())
-        .setMax(data.max());
-    this.setInt64DistributionTypeUrn();
+  public SimpleMonitoringInfoBuilder setInt64LatestValue(GaugeData data) {
+    checkArgument(GaugeData.empty() != data, "Cannot encode empty gauge data");
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      VARINT_CODER.encode(data.timestamp().getMillis(), output);
+      VARINT_CODER.encode(data.value(), output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE);
     return this;
   }
 
-  /** Sets the the appropriate type URN for int64 distribution tuples. */
-  public SimpleMonitoringInfoBuilder setInt64DistributionTypeUrn() {
-    this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64);
+  /**
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
+    ByteString.Output output = ByteString.newOutput();
+    try {
+      VARINT_CODER.encode(data.count(), output);
+      VARINT_CODER.encode(data.sum(), output);
+      VARINT_CODER.encode(data.min(), output);
+      VARINT_CODER.encode(data.max(), output);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.builder.setPayload(output.toByteString());
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE);
     return this;
   }
 
-  /** Sets the the appropriate type URN for sum int64 counters. */
-  public SimpleMonitoringInfoBuilder setInt64TypeUrn() {
-    this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64);
+  /**
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setDoubleDistributionValue(
 
 Review comment:
   Yup

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398805578
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
 ##########
 @@ -446,7 +447,11 @@ long getInputElementsConsumed(final Iterable<MonitoringInfo> monitoringInfos) {
         String pcollection =
             mi.getLabelsOrDefault(MonitoringInfoConstants.Labels.PCOLLECTION, null);
         if (pcollection != null && pcollection.equals(grpcReadTransformOutputPCollectionName)) {
-          return mi.getMetric().getCounterData().getInt64Value();
+          try {
+            return VARINT_CODER.decode(mi.getPayload().newInput());
 
 Review comment:
   ditto, this is also too low level, please use a helper here. Let's try to encapsulate the bytes payload format of MonitoringInfos.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398802911
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -35,50 +35,82 @@
 public class FnApiMonitoringInfoToCounterUpdateTransformer
     implements MonitoringInfoToCounterUpdateTransformer {
 
-  final Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers = new HashMap<>();
+  final Map<String, Map<String, MonitoringInfoToCounterUpdateTransformer>>
 
 Review comment:
   Can you key this by a tuple instead of a 2 layer dictionary? That might reduce some of the boilerplate below. I believe the Pair class can work here 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-605020500
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395888153
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_int64:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_int64:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:top_n_int64:v1"];
+
+    // Represents the largest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:top_n_int64:v1"];
+
+    // Represents the smallest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                          "beam:metrics:bottom_n_int64:v1"];
+
+    // Represents the smallest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:bottom_n_int64:v1"];
+
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                       "beam:metrics:progress:v1"];
+
+    // General MonitoredState information which contains
+    // structured information which does not fit into a typical
+    // metric format. See MonitoringTableData for more details.
+    //
+    // Encoding: MonitoringTableData proto encoded as bytes
+    MONITORING_TABLE_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   Hopefully you can "flatten" out the proto a bit. A common complaint was that it was complicated to go through the nested hierarchy here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r396575954
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   The URN represents the unique identifier in the "semantic meaning" space of the counter. So saying that this is a user counter and that there are many types makes sense to me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398253068
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   For the Go short id generation, I'm relying on the label set to be unique, and map to the semantic URN and Type. That is, I'm assuming for a counter with a given namespace, name, and ptransform, will always be the same URN and Type, and should use the same short id.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398275235
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -126,15 +134,18 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 			},
 		}
 		// Monitoring info version.
-		monitoringInfo = append(monitoringInfo,
-			&ppb.MonitoringInfo{
-				Urn:  "beam:metric:element_count:v1",
-				Type: "beam:metrics:sum_int_64",
-				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
-				},
-				Data: int64Counter(snapshot.Count),
-			})
+		payload, err := int64Counter(snapshot.Count)
+		if err != nil {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398810999
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
 ##########
 @@ -54,39 +58,33 @@ public void setUp() throws Exception {
 
   @Test
   public void testTransformReturnsNullIfSpecValidationFails() {
-    Map<String, String> counterNameMapping = new HashMap<>();
-    counterNameMapping.put("beam:counter:supported", "supportedCounter");
-
     Map<String, DataflowStepContext> stepContextMapping = new HashMap<>();
 
     MSecMonitoringInfoToCounterUpdateTransformer testObject =
 
 Review comment:
   Can you add a TODO to rename this to ExecutionTimeMonitoringInfoToCounterUpdateTransformer

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604732007
 
 
   This is ready for the next review.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398801376
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -95,7 +99,12 @@ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
       return null;
     }
 
-    long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+    long value;
+    try {
+      value = VARINT_CODER.decode(monitoringInfo.getPayload().newInput());
 
 Review comment:
   Can we encapsulte this so that its still one line to get the value out? I think that this would be a nicer interface. For consumers. Ideally something as easy to use on the receiving side as SimpleMonitoringInfoBuilder.
   
   i.e.
   long value = MonitoringInfos.getSumInt64Value(monitoringInfo)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604194840
 
 
   While this is getting reviewed, I'm going to start working on testing/updating the Google codebase to reflect this backwards incompatible change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604808223
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398957458
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +62,160 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:sum_int64:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:sum_double:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_INT64 = 2 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:distribution_int64:v1",
+      type: "beam:metrics:distribution_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_DOUBLE = 3 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:distribution_double:v1",
+      type: "beam:metrics:distribution_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_INT64 = 4 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:latest_int64:v1",
+      type: "beam:metrics:latest_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen double value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_DOUBLE = 5 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:latest_double:v1",
+      type: "beam:metrics:latest_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of integer values seen across bundles.
+    USER_TOP_N_INT64 = 6 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:top_n_int64:v1",
+      type: "beam:metrics:top_n_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of double values seen across bundles.
+    USER_TOP_N_DOUBLE = 7 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:top_n_double:v1",
+      type: "beam:metrics:top_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the smallest set of integer values seen across bundles.
+    USER_BOTTOM_N_INT64 = 8 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:bottom_n_int64:v1",
+      type: "beam:metrics:bottom_n_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the smallest set of double values seen across bundles.
+    USER_BOTTOM_N_DOUBLE = 9 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:bottom_n_double:v1",
+      type: "beam:metrics:bottom_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // General monitored state information which contains structured information
+    // which does not fit into a typical metric format. See MonitoringTableData
+    // for more details.
+    //
+    // TODO(BEAM-9617): Support monitored state.
+    //
+    // USER_MONITORING_TABLE = XX [(monitoring_info_spec) = {
+    //   urn: "beam:metric:user:v1",
+    //   type: "beam:metrics:monitoring_table:v1",
+    //   required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+    //   annotations: [{
+    //     key: "description",
+    //     value: "URN utilized to report user monitoring data."
+    //   }]
+    // }];
+
+    ELEMENT_COUNT = 10 [(monitoring_info_spec) = {
 
 Review comment:
   Having the actual monitoring specs that are related grouped together is all that matters, the numbers shouldn't matter.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604195239
 
 
   Run Java Flink PortableValidatesRunner Batch

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398851464
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
 ##########
 @@ -54,39 +58,33 @@ public void setUp() throws Exception {
 
   @Test
   public void testTransformReturnsNullIfSpecValidationFails() {
-    Map<String, String> counterNameMapping = new HashMap<>();
-    counterNameMapping.put("beam:counter:supported", "supportedCounter");
-
     Map<String, DataflowStepContext> stepContextMapping = new HashMap<>();
 
     MSecMonitoringInfoToCounterUpdateTransformer testObject =
 
 Review comment:
   Did the rename.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398239127
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,40 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
 
 Review comment:
   Error case reversal. 
   Consider 
   ```
   if err != nil {
     panic(err)
   }
   // ...actual contents of the block. 
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398750376
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -33,14 +33,17 @@ import "beam_runner_api.proto";
 import "google/protobuf/descriptor.proto";
 import "google/protobuf/timestamp.proto";
 
-// A specification containing required set of fields and labels required
-// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
-// ProcessBundleResponse reporting.
+// A specification for describing a well known MonitoringInfo.
 message MonitoringInfoSpec {
+  // Defines the semantic meaning of the metric or monitored state.
 
 Review comment:
   Could you mention in the comment what the valid values for this are, where you could find a list of some of them, etc.?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398952488
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +61,160 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
 
 Review comment:
   It is explicit in the type field which is a URN denoting exactly how the values are encoded? Did we need more?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395908901
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_int64:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_int64:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:top_n_int64:v1"];
+
+    // Represents the largest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:top_n_int64:v1"];
+
+    // Represents the smallest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                          "beam:metrics:bottom_n_int64:v1"];
+
+    // Represents the smallest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:bottom_n_int64:v1"];
+
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   This is the individual progress for each element and restriction pair for a splittable DoFn. The aggregation is interesting since if you sum across all **latest** values then that tells you the amount of completed work globally and the amount of known but not yet finished work globally. Note that this won't cover the amount of unknown work (e.g. elements and restrictions that have yet to be processed).
   
   @robertwb / @ajamato What do you think about adding `ELEMENT` as labels (storing the encoded element and restriction) and then we could use `LATEST_DOUBLE_TYPE`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398899707
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -95,7 +99,12 @@ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
       return null;
     }
 
-    long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+    long value;
+    try {
+      value = VARINT_CODER.decode(monitoringInfo.getPayload().newInput());
 
 Review comment:
   Done here an elsewhere. I introduced a MonitoringInfoEncodings class with the convenience methods for the currently used encodings.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398953040
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +62,160 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:sum_int64:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:sum_double:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_INT64 = 2 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:distribution_int64:v1",
+      type: "beam:metrics:distribution_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_DOUBLE = 3 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:distribution_double:v1",
+      type: "beam:metrics:distribution_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_INT64 = 4 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:latest_int64:v1",
+      type: "beam:metrics:latest_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen double value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_DOUBLE = 5 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:latest_double:v1",
+      type: "beam:metrics:latest_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of integer values seen across bundles.
+    USER_TOP_N_INT64 = 6 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:top_n_int64:v1",
+      type: "beam:metrics:top_n_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of double values seen across bundles.
+    USER_TOP_N_DOUBLE = 7 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:top_n_double:v1",
+      type: "beam:metrics:top_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the smallest set of integer values seen across bundles.
+    USER_BOTTOM_N_INT64 = 8 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:bottom_n_int64:v1",
+      type: "beam:metrics:bottom_n_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the smallest set of double values seen across bundles.
+    USER_BOTTOM_N_DOUBLE = 9 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:bottom_n_double:v1",
+      type: "beam:metrics:bottom_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // General monitored state information which contains structured information
+    // which does not fit into a typical metric format. See MonitoringTableData
+    // for more details.
+    //
+    // TODO(BEAM-9617): Support monitored state.
+    //
+    // USER_MONITORING_TABLE = XX [(monitoring_info_spec) = {
+    //   urn: "beam:metric:user:v1",
+    //   type: "beam:metrics:monitoring_table:v1",
+    //   required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+    //   annotations: [{
+    //     key: "description",
+    //     value: "URN utilized to report user monitoring data."
+    //   }]
+    // }];
+
+    ELEMENT_COUNT = 10 [(monitoring_info_spec) = {
 
 Review comment:
   Nit: Should we jump to 100 (or alternatively have the user types be 1xxx) just to have nice consecutive numbers are more types get added?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398758626
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   I would also prefer an API where you would really only need to look at the MonitoringIfno URN and then you would instantly know everything else you could expect to find in there. So I would prefer not having MonitoringInfoSpecs which allow multiple different types. I think that would just be confusing for implementers on the runner side. 
   
   Defining the types upfront is nice so that the implementers can see at a glance all the types they want to support. And the type_urn and type bytes packaging does allow it to be more general so that custom runners and sdks could extend it with custom types.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395888595
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   I think there are only a few of these types being used now. SUM_INT64_TYPE and DISTRIBUTION_INT64_TYPE. I hope we can make it very simple to add new ones of these with minimal changes (Adding MonitoringInfoSpec and reusing existing framework/libraries in the SDK, runners can mostly pass them through to a service to aggregate across multiple workers)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-605299101
 
 
   https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Commit/7318/ passed, merging

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398812690
 
 

 ##########
 File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
 ##########
 @@ -115,17 +113,15 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
         MetricKey.create(
             labelsMap.get(STEP_NAME_LABEL),
             MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
-    MetricsApi.IntExtremaData extremaData =
-        monitoringInfo.getMetric().getExtremaData().getIntExtremaData();
-    // Get only last value of the extrema table
-    Instant timestamp = Instant.ofEpochSecond(monitoringInfo.getTimestamp().getSeconds());
-    if (extremaData.getIntValuesCount() > 0) {
-      GaugeResult result =
-          GaugeResult.create(
-              extremaData.getIntValues(extremaData.getIntValuesCount() - 1), timestamp);
+    try {
+      InputStream input = monitoringInfo.getPayload().newInput();
+      long timestamp = VARINT_CODER.decode(input);
 
 Review comment:
   ditto, please use a helper here to make this a one liner to extract the value

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395933544
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,121 +330,148 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
-  // A set of key+value labels which define the scope of the metric.
+
+  // A set of key and value labels which define the scope of the metric. For
+  // well known URNs, the set of required labels is provided by the associated
+  // MonitoringInfoSpec.
+  //
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
+// A set of well known URNs that specify the encoding and aggregation method.
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
-
-    // iterable<double> is encoded with a beam:coder:double:v1 coder for each
-    // element.
-    LATEST_DOUBLES_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                 "beam:metrics:latest_doubles"];
-  }
-}
-
-message Metric {
-  // (Required) The data for this metric.
-  oneof data {
-    CounterData counter_data = 1;
-    DistributionData distribution_data = 2;
-    ExtremaData extrema_data = 3;
-  }
-}
-
-// Data associated with a Counter or Gauge metric.
-// This is designed to be compatible with metric collection
-// systems such as DropWizard.
-message CounterData {
-  oneof value {
-    int64 int64_value = 1;
-    double double_value = 2;
-    string string_value = 3;
-  }
-}
-
-// Extrema messages are used for calculating
-// Top-N/Bottom-N metrics.
-message ExtremaData {
-  oneof extrema {
-    IntExtremaData int_extrema_data = 1;
-    DoubleExtremaData double_extrema_data = 2;
-  }
-}
-
-message IntExtremaData {
-  repeated int64 int_values = 1;
-}
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_double:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_double:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen double value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_double:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
 
 Review comment:
   This looks like outer ListCoder. Should we use nested ListCoder instead? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398759581
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_INT64 = 2 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:distribution_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_DOUBLE = 3 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:distribution_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_INT64 = 4 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:latest_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen double value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_DOUBLE = 5 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:latest_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of integer values seen across bundles.
+    USER_TOP_N_INT64 = 6 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:top_n_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of double values seen across bundles.
+    USER_TOP_N_DOUBLE = 7 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:top_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the smallest set of integer values seen across bundles.
+    USER_BOTTOM_N_INT64 = 8 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:bottom_n_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents the smallest set of double values seen across bundles.
+    USER_BOTTOM_N_DOUBLE = 9 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:bottom_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // General monitored state information which contains structured information
+    // which does not fit into a typical metric format. See MonitoringTableData
+    // for more details.
+    USER_MONITORING_TABLE = 10 [(monitoring_info_spec) = {
 
 Review comment:
   Is there a reason you want to populate these today? When there is no usage of these yet in the runners and sdks we have here today?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398850622
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_INT64 = 2 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:distribution_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    USER_DISTRIBUTION_DOUBLE = 3 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:distribution_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_INT64 = 4 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:latest_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the latest seen double value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    USER_LATEST_DOUBLE = 5 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:latest_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of integer values seen across bundles.
+    USER_TOP_N_INT64 = 6 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:top_n_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the largest set of double values seen across bundles.
+    USER_TOP_N_DOUBLE = 7 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:top_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents the smallest set of integer values seen across bundles.
+    USER_BOTTOM_N_INT64 = 8 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:bottom_n_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents the smallest set of double values seen across bundles.
+    USER_BOTTOM_N_DOUBLE = 9 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:bottom_n_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // General monitored state information which contains structured information
+    // which does not fit into a typical metric format. See MonitoringTableData
+    // for more details.
+    USER_MONITORING_TABLE = 10 [(monitoring_info_spec) = {
 
 Review comment:
   Commented it out with a TODO and filed BEAM-9617

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r396556158
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -139,7 +137,7 @@ message MonitoringInfoSpecs {
 
     USER_DISTRIBUTION_COUNTER = 6 [(monitoring_info_spec) = {
       urn: "beam:metric:user_distribution",
-      type_urn: "beam:metrics:distribution_int_64",
+      type_urn: "beam:metrics:distribution_int64",
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398843200
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -33,14 +33,17 @@ import "beam_runner_api.proto";
 import "google/protobuf/descriptor.proto";
 import "google/protobuf/timestamp.proto";
 
-// A specification containing required set of fields and labels required
-// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
-// ProcessBundleResponse reporting.
+// A specification for describing a well known MonitoringInfo.
 message MonitoringInfoSpec {
+  // Defines the semantic meaning of the metric or monitored state.
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604517679
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398765776
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,43 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+			  panic(err)
+			}
+      monitoringInfo = append(monitoringInfo,
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r396556062
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   The problematic type is `string`:
   Is `sum` the same as concat? (if so, this will grow to be huge very quickly also making distribution pointless)
   Is min/max/top_n/bottom_n based upon lexicographical ordering? (if so, case sensitive/insensitive/...)
   
   `string` only makes sense with latest and if we wanted to define any of the other ones I think it would be better to use terms like `case_insensitive_max`.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398843062
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -33,14 +33,17 @@ import "beam_runner_api.proto";
 import "google/protobuf/descriptor.proto";
 import "google/protobuf/timestamp.proto";
 
-// A specification containing required set of fields and labels required
-// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
-// ProcessBundleResponse reporting.
+// A specification for describing a well known MonitoringInfo.
 message MonitoringInfoSpec {
+  // Defines the semantic meaning of the metric or monitored state.
   string urn = 1;
-  string type_urn = 2;
-  // The list of required
+
+  // Defines the encoding and aggregation method for the payload.
+  string type = 2;
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r397510852
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   I suppose I still haven't bought into the idea that the whole set of "user counters" belongs to a single semantic identifier on par with each individual system counter like msecs or element counts--all we really need is a way to not have names between the two collide--but if we're going to go this route I don't think there's value in enumerating them here (vs. simply saying "user counter" can use any type). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398858428
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -35,50 +35,82 @@
 public class FnApiMonitoringInfoToCounterUpdateTransformer
     implements MonitoringInfoToCounterUpdateTransformer {
 
-  final Map<String, MonitoringInfoToCounterUpdateTransformer> counterTransformers = new HashMap<>();
+  final Map<String, Map<String, MonitoringInfoToCounterUpdateTransformer>>
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398806997
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -95,7 +100,12 @@ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
       return null;
     }
 
-    long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+    long value;
+    try {
+      value = VARINT_CODER.decode(monitoringInfo.getPayload().newInput());
 
 Review comment:
   ditto please use one liner here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604195288
 
 
   Run Java Flink PortableValidatesRunner Streaming

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604198250
 
 
   Run Java Flink PortableValidatesRunner Streaming

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604194556
 
 
   R: @ajamato for Java/Python
   R: @lostluck for Go

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398805227
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -100,8 +101,18 @@ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
       return null;
     }
 
-    IntDistributionData value =
-        monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
+    long count;
+    long sum;
+    try {
+      InputStream payload = monitoringInfo.getPayload().newInput();
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398755038
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,43 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+			  panic(err)
+			}
+      monitoringInfo = append(monitoringInfo,
 
 Review comment:
   Need to run `go fmt` again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398751386
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,121 +330,152 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
-  // A set of key+value labels which define the scope of the metric.
+
+  // A set of key and value labels which define the scope of the metric. For
+  // well known URNs, the set of required labels is provided by the associated
+  // MonitoringInfoSpec.
+  //
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
+// A set of well known URNs that specify the encoding and aggregation method.
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
-
-    // iterable<double> is encoded with a beam:coder:double:v1 coder for each
-    // element.
-    LATEST_DOUBLES_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                 "beam:metrics:latest_doubles"];
-  }
-}
-
-message Metric {
-  // (Required) The data for this metric.
-  oneof data {
-    CounterData counter_data = 1;
-    DistributionData distribution_data = 2;
-    ExtremaData extrema_data = 3;
-  }
-}
-
-// Data associated with a Counter or Gauge metric.
-// This is designed to be compatible with metric collection
-// systems such as DropWizard.
-message CounterData {
-  oneof value {
-    int64 int64_value = 1;
-    double double_value = 2;
-    string string_value = 3;
-  }
-}
-
-// Extrema messages are used for calculating
-// Top-N/Bottom-N metrics.
-message ExtremaData {
-  oneof extrema {
-    IntExtremaData int_extrema_data = 1;
-    DoubleExtremaData double_extrema_data = 2;
-  }
-}
-
-message IntExtremaData {
-  repeated int64 int_values = 1;
-}
-
-message DoubleExtremaData {
-  repeated double double_values = 2;
-}
-
-// Data associated with a distribution metric.
-// This is based off of the current DistributionData metric.
-// This is not a stackdriver or dropwizard compatible
-// style of distribution metric.
-message DistributionData {
-  oneof distribution {
-    IntDistributionData int_distribution_data = 1;
-    DoubleDistributionData double_distribution_data = 2;
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_double:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_double:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   Should we consider explicitly having the ms as part of the URN, to keep 'us' and 'ns' granularity definitions open for the future?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r397507973
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   Agree that string summation (and those metrics requiring it) could be omitted, but min, max, top_n, bottom_n are meaningful. (And distribution could still be meaningful giving a count and range.) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r396573892
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,121 +330,148 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
-  // A set of key+value labels which define the scope of the metric.
+
+  // A set of key and value labels which define the scope of the metric. For
+  // well known URNs, the set of required labels is provided by the associated
+  // MonitoringInfoSpec.
+  //
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
+// A set of well known URNs that specify the encoding and aggregation method.
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
-
-    // iterable<double> is encoded with a beam:coder:double:v1 coder for each
-    // element.
-    LATEST_DOUBLES_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                 "beam:metrics:latest_doubles"];
-  }
-}
-
-message Metric {
-  // (Required) The data for this metric.
-  oneof data {
-    CounterData counter_data = 1;
-    DistributionData distribution_data = 2;
-    ExtremaData extrema_data = 3;
-  }
-}
-
-// Data associated with a Counter or Gauge metric.
-// This is designed to be compatible with metric collection
-// systems such as DropWizard.
-message CounterData {
-  oneof value {
-    int64 int64_value = 1;
-    double double_value = 2;
-    string string_value = 3;
-  }
-}
-
-// Extrema messages are used for calculating
-// Top-N/Bottom-N metrics.
-message ExtremaData {
-  oneof extrema {
-    IntExtremaData int_extrema_data = 1;
-    DoubleExtremaData double_extrema_data = 2;
-  }
-}
-
-message IntExtremaData {
-  repeated int64 int_values = 1;
-}
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_double:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_double:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen double value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_double:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398764642
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,43 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+			  panic(err)
+			}
+      monitoringInfo = append(monitoringInfo,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395908901
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_int64:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_int64:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:top_n_int64:v1"];
+
+    // Represents the largest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:top_n_int64:v1"];
+
+    // Represents the smallest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                          "beam:metrics:bottom_n_int64:v1"];
+
+    // Represents the smallest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:bottom_n_int64:v1"];
+
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   This is the individual progress for each element and restriction pair for a splittable DoFn. that is being processed actively. The issue is that we don't report the metric for elements that aren't actively being processed (either in the past or in the future). If we did report for all the past values then the aggregation is interesting since if you sum across all **latest** values then that tells you the amount of completed work globally and the amount of known but not yet finished work globally. Note that this won't cover the amount of unknown work (e.g. elements and restrictions that have yet to be processed) unless we report the metric for all **future** values which is impossible in many scenarios.
   
   I'm not sure if we want to report "progress" for each element we have processed to be able to have an accurate aggregation here.
   
   @robertwb / @ajamato If we did decide to report progress for all past values then it would make sense to define this as `LATEST_DOUBLE_TYPE` and add an `ELEMENT` label that represents the encoded `element+restriction` pair (effectively what was fed into the Splittable DoFn).
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398275201
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,40 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395887788
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -139,7 +137,7 @@ message MonitoringInfoSpecs {
 
     USER_DISTRIBUTION_COUNTER = 6 [(monitoring_info_spec) = {
       urn: "beam:metric:user_distribution",
-      type_urn: "beam:metrics:distribution_int_64",
+      type_urn: "beam:metrics:distribution_int64",
 
 Review comment:
   Add a :v1 here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604196167
 
 
   Run PythonLint PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398753853
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +55,157 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
       required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
       annotations: [{
         key: "description",
-        value: "URN utilized to report user numeric counters."
+        value: "URN utilized to report user metric."
       }]
     }];
 
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
 
 Review comment:
   The reason we would want a type, is that the type defined how its aggregated. So a runner harness can plumb them through without fully needing to unpack it, as long as it can unpack the type portion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604195239
 
 
   Run Java Flink PortableValidatesRunner Batch

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398812861
 
 

 ##########
 File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
 ##########
 @@ -135,21 +131,23 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
         MetricKey.create(
             labelsMap.get(STEP_NAME_LABEL),
             MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
-    MetricsApi.IntDistributionData intDistributionData =
-        monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
-    DistributionResult result =
-        DistributionResult.create(
-            intDistributionData.getSum(),
-            intDistributionData.getCount(),
-            intDistributionData.getMin(),
-            intDistributionData.getMax());
-    return MetricResult.create(key, false, result);
+    try {
+      InputStream input = monitoringInfo.getPayload().newInput();
+      long count = VARINT_CODER.decode(input);
 
 Review comment:
   ditto, please use helper here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398957888
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +61,160 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
 
 Review comment:
   I chatted with Alex about this and the TypeUrns describing the encoding was enough.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395933271
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_int64:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int64:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:double:v1
+    LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:latest_int64:v1"];
+
+    // Represents the largest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:top_n_int64:v1"];
+
+    // Represents the largest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:top_n_int64:v1"];
+
+    // Represents the smallest set of integer values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:varint:v1
+    BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                          "beam:metrics:bottom_n_int64:v1"];
+
+    // Represents the smallest set of double values seen across bundles.
+    //
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metrics:bottom_n_int64:v1"];
+
+    // Encoding: <value1><value2>...<valueN>
+    //   - valueX: beam:coder:double:v1
+    PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   We don't want to report progress for each processed element, as this would become an unreasonably large value. We could report aggregated progress of all finished ones, plus partial progress of the in-flight ones, in which case the sum becomes a refinement of done-element-count. There's still no useful way to define cross-bundle aggregation (without resulting in an absolutely huge value), so I would lean towards this being its own type. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398239079
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,40 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+				monitoringInfo = append(monitoringInfo,
+					&ppb.MonitoringInfo{
+						Urn:     "beam:metric:user:v1",
+						Type:    "beam:metrics:sum_int64:v1",
+						Labels:  userLabels(l),
+						Payload: payload,
+					})
+			}
 		},
 		DistributionInt64: func(l metrics.Labels, count, sum, min, max int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user_distribution",
-					Type:   "beam:metrics:distribution_int_64",
-					Labels: userLabels(l),
-					Data:   int64Distribution(count, sum, min, max),
-				})
+			payload, err := int64Distribution(count, sum, min, max)
+			if err != nil {
 
 Review comment:
   Error case reversal. 
   Consider 
   ```
   if err != nil {
     panic(err)
   }
   // ...actual contents of the block. 
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398916506
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -52,38 +61,160 @@ message Annotation {
   string value = 2;
 }
 
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
+// A set of well known MonitoringInfo specifications.
 message MonitoringInfoSpecs {
   enum Enum {
-    // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user",
-      type_urn: "beam:metrics:sum_int_64",
+    // Represents an integer counter where values are summed across bundles.
+    USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a double counter where values are summed across bundles.
+    USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:v1",
+      type: "beam:metrics:sum_double:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
 
 Review comment:
   Now it seems like there technically aren't any fields named "count", "sum", "min", "max". Just 4 encoded varints in that specific order. There is no longer a proto or anything which defines this format.
   
   If we are going to keep type urns, I think that there should be somewhere in this file where you could a "TypeSpec", which describes how to encode each opaque bytes payload. i.e. the coders used for each value, the order they must be encoded. Or a proto that should be serialized into that bytes field, etc. A description that will work for all languages. Right now you can only know that from looking at your encoding code.
   
   I think it would be best if SDK implemented could look at a reference file like this and know how to populate the MonitoringInfo. That was the original intention behind MonitoringInfoSpec, and I believe that is a bit lost now with this change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604490532
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398750316
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -33,14 +33,17 @@ import "beam_runner_api.proto";
 import "google/protobuf/descriptor.proto";
 import "google/protobuf/timestamp.proto";
 
-// A specification containing required set of fields and labels required
-// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
-// ProcessBundleResponse reporting.
+// A specification for describing a well known MonitoringInfo.
 message MonitoringInfoSpec {
+  // Defines the semantic meaning of the metric or monitored state.
   string urn = 1;
-  string type_urn = 2;
-  // The list of required
+
+  // Defines the encoding and aggregation method for the payload.
+  string type = 2;
 
 Review comment:
   Is this still a URN? Could you mention in the comment what the valid values for this are, where you could find a list of some of them, etc.?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398855313
 
 

 ##########
 File path: runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
 ##########
 @@ -28,30 +31,17 @@
    * Matches a {@link MonitoringInfo} with that has the set fields in the provide MonitoringInfo.
    *
    * <p>This is useful for tests which do not want to match the specific value (execution times).
-   * Currently this will only check for URNs, labels, type URNs and int64Values.
+   * Currently this will only check for URNs, labels, type URNs and payloads.
    */
   public static TypeSafeMatcher<MonitoringInfo> matchSetFields(final MonitoringInfo mi) {
     return new TypeSafeMatcher<MonitoringInfo>() {
 
       @Override
       protected boolean matchesSafely(MonitoringInfo item) {
-        if (!item.getUrn().equals(mi.getUrn())) {
-          return false;
-        }
-        if (!item.getLabels().equals(mi.getLabels())) {
-          return false;
-        }
-        if (!item.getType().equals(mi.getType())) {
-          return false;
-        }
-
-        if (mi.getMetric().hasCounterData()) {
-          long valueToMatch = mi.getMetric().getCounterData().getInt64Value();
-          if (valueToMatch != item.getMetric().getCounterData().getInt64Value()) {
-            return false;
-          }
-        }
-        return true;
+        return (mi.getUrn().isEmpty() || mi.getUrn().equals(item.getUrn()))
 
 Review comment:
   The contract of the matcher states that it only compares **set** fields and it seemed it was incorrectly implemented. I can change it back to the existing implementation where `urn`, `labels`, and `type` must always be specified and fix the comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395905193
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398275209
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,40 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+				monitoringInfo = append(monitoringInfo,
+					&ppb.MonitoringInfo{
+						Urn:     "beam:metric:user:v1",
+						Type:    "beam:metrics:sum_int64:v1",
+						Labels:  userLabels(l),
+						Payload: payload,
+					})
+			}
 		},
 		DistributionInt64: func(l metrics.Labels, count, sum, min, max int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user_distribution",
-					Type:   "beam:metrics:distribution_int_64",
-					Labels: userLabels(l),
-					Data:   int64Distribution(count, sum, min, max),
-				})
+			payload, err := int64Distribution(count, sum, min, max)
+			if err != nil {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604823688
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-601897399
 
 
   CC: @ajamato 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395892463
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
 
 Review comment:
   Let's add some comments to make it clear the type is referring to what is collected in each MonitoringInfo update, and how they should be aggregated together

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604195288
 
 
   Run Java Flink PortableValidatesRunner Streaming

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604198209
 
 
   Run Java Flink PortableValidatesRunner Batch

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398804945
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -120,14 +110,19 @@ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
       return null;
     }
 
-    long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+    long value;
+    try {
+      value = VARINT_CODER.decode(monitoringInfo.getPayload().newInput());
 
 Review comment:
   Ditto here, please use a one liner helper method to extract the value, here and in all the transformer classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395935001
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,101 +215,127 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
+
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   Rather than manually write out the cross product, how about we define `{sum,min,max,top_n,bottom_n,distribtuion,latest}_{int64,double,string}` types as having known semantics and encoding? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398767189
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -229,121 +330,152 @@ message MonitoringInfo {
     NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
     NAME = 6 [(label_props) = { name: "NAME" }];
   }
-  // A set of key+value labels which define the scope of the metric.
+
+  // A set of key and value labels which define the scope of the metric. For
+  // well known URNs, the set of required labels is provided by the associated
+  // MonitoringInfoSpec.
+  //
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
+  //
   // A monitoring system is expected to be able to aggregate the metrics
   // together for all updates having the same URN and labels. Some systems such
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
+  map<string, string> labels = 4;
 }
 
+// A set of well known URNs that specify the encoding and aggregation method.
 message MonitoringInfoTypeUrns {
   enum Enum {
+    // Represents an integer counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   - value: beam:coder:varint:v1
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
-
-    // iterable<double> is encoded with a beam:coder:double:v1 coder for each
-    // element.
-    LATEST_DOUBLES_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                 "beam:metrics:latest_doubles"];
-  }
-}
-
-message Metric {
-  // (Required) The data for this metric.
-  oneof data {
-    CounterData counter_data = 1;
-    DistributionData distribution_data = 2;
-    ExtremaData extrema_data = 3;
-  }
-}
-
-// Data associated with a Counter or Gauge metric.
-// This is designed to be compatible with metric collection
-// systems such as DropWizard.
-message CounterData {
-  oneof value {
-    int64 int64_value = 1;
-    double double_value = 2;
-    string string_value = 3;
-  }
-}
-
-// Extrema messages are used for calculating
-// Top-N/Bottom-N metrics.
-message ExtremaData {
-  oneof extrema {
-    IntExtremaData int_extrema_data = 1;
-    DoubleExtremaData double_extrema_data = 2;
-  }
-}
-
-message IntExtremaData {
-  repeated int64 int_values = 1;
-}
-
-message DoubleExtremaData {
-  repeated double double_values = 2;
-}
-
-// Data associated with a distribution metric.
-// This is based off of the current DistributionData metric.
-// This is not a stackdriver or dropwizard compatible
-// style of distribution metric.
-message DistributionData {
-  oneof distribution {
-    IntDistributionData int_distribution_data = 1;
-    DoubleDistributionData double_distribution_data = 2;
+                            "beam:metrics:sum_int64:v1"];
+
+    // Represents a double counter where values are summed across bundles.
+    //
+    // Encoding: <value>
+    //   value: beam:coder:double:v1
+    SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                        "beam:metrics:sum_double:v1"];
+
+    // Represents a distribution of an integer value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:varint:v1
+    //   - min:   beam:coder:varint:v1
+    //   - max:   beam:coder:varint:v1
+    DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int64:v1"];
+
+    // Represents a distribution of a double value where:
+    //   - count: represents the number of values seen across all bundles
+    //   - sum: represents the total of the value across all bundles
+    //   - min: represents the smallest value seen across all bundles
+    //   - max: represents the largest value seen across all bundles
+    //
+    // Encoding: <count><sum><min><max>
+    //   - count: beam:coder:varint:v1
+    //   - sum:   beam:coder:double:v1
+    //   - min:   beam:coder:double:v1
+    //   - max:   beam:coder:double:v1
+    DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                 "beam:metrics:distribution_double:v1"];
+
+    // Represents the latest seen integer value. The timestamp is used to
+    // provide an "ordering" over multiple values to determine which is the
+    // latest.
+    //
+    // Encoding: <timestamp><value>
+    //   - timestamp: beam:coder:varint:v1     (milliseconds since epoch)
+    //   - value:     beam:coder:varint:v1
+    LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
 
 Review comment:
   Would need to do this in other places such as the timer coder, windowed value coder if we wanted to keep the URN naming to be consistent so I'll leave as is and added this to the protocol changes tracking sheet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398275219
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -80,36 +84,40 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 	var monitoringInfo []*ppb.MonitoringInfo
 	metrics.Extractor{
 		SumInt64: func(l metrics.Labels, v int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user",
-					Type:   "beam:metrics:sum_int_64",
-					Labels: userLabels(l),
-					Data:   int64Counter(v),
-				})
+			payload, err := int64Counter(v)
+			if err != nil {
+				monitoringInfo = append(monitoringInfo,
+					&ppb.MonitoringInfo{
+						Urn:     "beam:metric:user:v1",
+						Type:    "beam:metrics:sum_int64:v1",
+						Labels:  userLabels(l),
+						Payload: payload,
+					})
+			}
 		},
 		DistributionInt64: func(l metrics.Labels, count, sum, min, max int64) {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:    "beam:metric:user_distribution",
-					Type:   "beam:metrics:distribution_int_64",
-					Labels: userLabels(l),
-					Data:   int64Distribution(count, sum, min, max),
-				})
+			payload, err := int64Distribution(count, sum, min, max)
+			if err != nil {
+				monitoringInfo = append(monitoringInfo,
+					&ppb.MonitoringInfo{
+						Urn:     "beam:metric:user:v1",
+						Type:    "beam:metrics:distribution_int64:v1",
+						Labels:  userLabels(l),
+						Payload: payload,
+					})
+			}
 		},
 		GaugeInt64: func(l metrics.Labels, v int64, t time.Time) {
-			ts, err := ptypes.TimestampProto(t)
+			payload, err := int64Latest(t, v)
 			if err != nil {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398899707
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ElementCountMonitoringInfoToCounterUpdateTransformer.java
 ##########
 @@ -95,7 +99,12 @@ public CounterUpdate transform(MonitoringInfo monitoringInfo) {
       return null;
     }
 
-    long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
+    long value;
+    try {
+      value = VARINT_CODER.decode(monitoringInfo.getPayload().newInput());
 
 Review comment:
   Done here and elsewhere. I introduced a MonitoringInfoEncodings class with the convenience methods for the currently used encodings.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604754371
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on issue #11184: [BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-604490532
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik edited a comment on issue #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik edited a comment on issue #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#issuecomment-601897399
 
 
   CC: @ajamato @robertwb 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398238942
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -126,15 +134,18 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
 			},
 		}
 		// Monitoring info version.
-		monitoringInfo = append(monitoringInfo,
-			&ppb.MonitoringInfo{
-				Urn:  "beam:metric:element_count:v1",
-				Type: "beam:metrics:sum_int_64",
-				Labels: map[string]string{
-					"PCOLLECTION": snapshot.PID,
-				},
-				Data: int64Counter(snapshot.Count),
-			})
+		payload, err := int64Counter(snapshot.Count)
+		if err != nil {
 
 Review comment:
   Error case reversal. 
   Consider 
   ```
   if err != nil {
     panic(err)
   }
   // ...actual contents of the block. 
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395889104
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -194,33 +188,25 @@ extend google.protobuf.EnumValueOptions {
 }
 
 message MonitoringInfo {
-  // The name defining the metric or monitored state.
+  // The name defining the semantic meaning of the metric or monitored state.
+  //
+  // See MonitoringInfoSpecs.Enum for the set of well known metrics/monitored
+  // state.
   string urn = 1;
 
-  // This is specified as a URN that implies:
-  // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
-  // Sub types like field formats - int64, double, string.
-  // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
-  // valid values are:
-  // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
-  //     sum_double|latest_double|top_n_double|bottom_n_double|
-  //     distribution_int_64|distribution_double|monitoring_data_table|
-  //     latest_doubles
+  // This is specified as a URN that implies the encoding and aggregation
+  // method. See MonitoringInfoTypeUrns.Enum for the set of well known types.
   string type = 2;
 
-  // The Metric or monitored state.
-  oneof data {
-    MonitoringTableData monitoring_table_data = 3;
-    Metric metric = 4;
-    bytes payload = 7;
-  }
+  // The monitored state encoded as per the specification defined by the type.
+  bytes payload = 3;
 
 Review comment:
   My biggest concern is losing the ability to print debug strings, which are helpful when people are trying to learn how these are populated. But maybe we can just add a few obvious places to dump debug logs, debug files, etc with the MonitoringInfors parses properly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11184: [WIP][BEAM-4374] Update protos related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r395900617
 
 

 ##########
 File path: model/pipeline/src/main/proto/metrics.proto
 ##########
 @@ -194,33 +188,25 @@ extend google.protobuf.EnumValueOptions {
 }
 
 message MonitoringInfo {
-  // The name defining the metric or monitored state.
+  // The name defining the semantic meaning of the metric or monitored state.
+  //
+  // See MonitoringInfoSpecs.Enum for the set of well known metrics/monitored
+  // state.
   string urn = 1;
 
-  // This is specified as a URN that implies:
-  // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
-  // Sub types like field formats - int64, double, string.
-  // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
-  // valid values are:
-  // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
-  //     sum_double|latest_double|top_n_double|bottom_n_double|
-  //     distribution_int_64|distribution_double|monitoring_data_table|
-  //     latest_doubles
+  // This is specified as a URN that implies the encoding and aggregation
+  // method. See MonitoringInfoTypeUrns.Enum for the set of well known types.
   string type = 2;
 
-  // The Metric or monitored state.
-  oneof data {
-    MonitoringTableData monitoring_table_data = 3;
-    Metric metric = 4;
-    bytes payload = 7;
-  }
+  // The monitored state encoded as per the specification defined by the type.
+  bytes payload = 3;
 
 Review comment:
   Yup

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services