You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/18 23:21:54 UTC

[GitHub] [beam] ryucc opened a new pull request, #25068: Samza udf metrics

ryucc opened a new pull request, #25068:
URL: https://github.com/apache/beam/pull/25068

   - Implement SamzaMetricsBundleProgressHandler
   - Modify SamzaDoFnRunners to use SamzaMetricsBundleProgressHandler, instead of BundleProgressHandler.ignored()
   - Add unit tests
   - Refactor stepName member field in SamzaDoFnRunners
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [x] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083083766


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   You can't write something like that is what I was pointing out above. the signature of remote bundle is as follows
   
   ```  /** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */
     default RemoteBundle getBundle(
         OutputReceiverFactory outputReceiverFactory,
         TimerReceiverFactory timerReceiverFactory,
         StateRequestHandler stateRequestHandler,
         BundleProgressHandler progressHandler)
         throws Exception {
       return getBundle(
           outputReceiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler, null);
     }```
   
   Given you can't have multiple handlers, name is naturally misleading to assume that there can be other handlers and that is the pattern to follow. `StageBundleFactory` doesn't seem to support and hence the suggestion to remove metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083150540


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   @xinyuiscool @alnzng Please help make a final call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082953576


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   Adding Metrics in the class name is pretty intentional. The name keeps the responsibility smaller. If we want a GeneralBundleProgressHandler, we can chain these smaller ones.
   
   I would prefer not to expand this class beyond metrics handling in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083053797


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;

Review Comment:
   Inverted condition, fixed the return bug, and added unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082962347


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -388,6 +407,7 @@ private void emitMetrics() {
       final long finishBundleTime = System.nanoTime();
       final long averageProcessTime = (finishBundleTime - startBundleTime) / count;
 
+      String metricName = "ExecutableStage-" + stepName + "-process-ns";

Review Comment:
   I prefer not to use instance variables. We already have a large pool of it, and would like to keep only primitive data in there.
   
   On the same note, I want to keep data as local as possible. This `metricName` seems only useful for the emitMetrics method at the moment (it's specific to Executable stage time) , and may conflict with other metric names if the future.
   
   I think putting the computation here improves the code cleanness, and out weights the cost to compute it every time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083157931


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   "interactions with the component internals" is true, but it was a pattern already existing in `SamzaMetricsContainer`.
   
   I can change it from 
   
   ```
             Distribution distribution = metricsContainer.getDistribution(metricName);
             DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
             distribution.update(data.sum(), data.count(), data.min(), data.max());
   ```
   
   to
   
   ```
             DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
             Distribution distribution = metricsContainer.setDistribution(metricName, data);
   ```
   
   but I think the parsing stays here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083042856


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   By computation I mean lines that are not pure assignment.
   
   The constructor is usually not unit tested. If we add computation logic on to it, we would want to add a unit test onto it. Ending in something like
   
   ```
   class SdkHarnessDoFnRunner {
     @VisibleForTesting
     public Map<String, String> getTransformFullNameToUniqueName(){...}
   }
   
   class SdkHarnessDoFnRunnerTest {
     @Test
     public void testConstructor() {
        SdkHarnessDoFnRunner runner = new SdkHarnessDoFnRunner(...);
        assertEquals(runner.getTransformFullNameToUniqueName, expected);
     }
   }
   ```
   
   I'd rather not have extra stuff inside the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083045021


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   What do you mean by "pTransformNode can be meddled with"? In what cases will that happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083149759


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   The `Collection<pTransformNode>` is extracted from the `executableStage`, which is parsed from `RunnerApi.ExecutableStagePayload`.
   
   It should be a specification of the pipeline, and not change while we run it.
   
   If this assumption is wrong, I'll change this mapping assignment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #25068: Samza udf metrics

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083212204


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   Let's go with the Metric in the name for now. I agree more with Bharath on this one: having a single general handler here might be easier to understand. Chaining might be an overkill. We can discuss this further and refactor in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083072743


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   Dropping the assumption of `MonitoringInfo` is the contract between worker/runner.
   
   I'm okay with putting the code in `SamzaMetricsContainer`, but not `MetricsContainerImpl`.
   
   Flink put some `updateMetrics(monitoringInfo)` code in `MetricsContainerImpl`, but as part of the translation, a MetricName has to be defined. The Flink formatting is different from Samza, making it not directly usable for us.
   
   This is a violation of the dependency inversion principle, since it is no longer an abstraction, and doesn't work regardless who the client is. What would be better is `MetricsContainerImpl` provides methods usable by everyone, or just rename as `FlinkMetricsContainerImpl`.
   
   After encountering this problem, I wanted to pull the translation to as close to the `BundleProgressHandler` as possible. Maybe I moved it too far up.
   
   One concern for putting it in `SamzaMetricsContainer` though, is `SamzaMetricsContainer` serves both portability mode and classic mode. Why should we put a method only used by portable mode in the common space?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083138052


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   the class holds a reference of `Collection<pTransformNode>` and this reference can change over the lifecycle of this class and the transform name map that you produce changes everytime when something changes to this reference and thus causing the invariant of passing potentially different metric names upon the callsite.
   
   By enforcing the computation to occur only once at least guarantees we always pass the same transform name maps and hence easy to reason about what the code does and how it behaves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083052029


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   I like delegating to multiple components. It's also a possible solution in the future. It's an overkill now since we only have 1 component.
   
   I think the naming with Metric will prevent us from writing all code in 1 handler in the future, and make the developer think about either chaining or using components.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083139422


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   portability or not, one can argue the responsibility is metrics handling and hence the logic should stay within SamzaMetricsContainer. Especially most of what the code does is pertained to metrics container and its internals.
   
   IMO, the above code violates closed principle and leads to interactions with the component internals that `ProgressHandler` depends on it.
   
   The suggestion to keep this outside this class isn't motivated by how things are implemented in flink. In fact, I haven't looked at it how it does. My rationale is purely from how responsibilities are structured and the impact evolution of these responsibilities. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083140997


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   The chained bundle handlers pattern is a bit hard to read and follow because you can have multiple of these handlers. 
   As opposed to having one handler which delegates to appropriate components as part of handling.
   
   I feel removing metrics from the name doesn't take away any of what its doing and keeps the door open regardless of both of the approaches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083147733


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   The Delegating handler pattern has the same problem. You will still inject the components in the constructor, unless you hard code them, which makes it less flexible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082953576


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   Adding Metrics in the class name is pretty intentional. The name keeps the responsibility smaller. If we want a GeneralBundleProgressHandler, we can chain these smaller ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083016937


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   Its good practice to keep constructor light weight that said, it doesn't have to always have assignments within it. Some initialization logic is okay. Other benefits doing so,
   - it helps building invariants that this field isn't going to change upon construction
   - handle to pTransformNode is unnecessary and can potentially be meddled with which will make this piece of non deterministic breaking the assumption that `transformFullNameToUniqueName` changes.
   - Ideally, you would want only the names to be dependency injected if you don't need the `pTransformNode` at all. 
   
   The code you pasted above is mostly blueprint and not performing any computation per-say. local variables are introduced for readability and might as well replace them with the getters of the already existing instance variables. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083004426


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   I don't think BundleFactory takes multiple `BundleProgressHandler`s. The responsibility of `BundleProgressHandler` to listen to the signal of progress and take actions. It is agnostic to what actions are taken within and hence metric isn't part of the implementation.
   
   If you want to take multiple actions within the handler, it could be delegated to multiple components but the one front facing component that bridges the actions and listening is done by this implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083045021


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   What do you mean by "pTransformNode can be meddled with"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083112181


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.

Review Comment:
   We will start with only handling the metrics on `onCompleted`. Then see what is needed later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool merged pull request #25068: Samza udf metrics

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool merged PR #25068:
URL: https://github.com/apache/beam/pull/25068


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082976820


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   Think it's okay. If we didn't want redundant computation then the above lines
   
   ```
           OutputReceiverFactory receiverFactory =
               new OutputReceiverFactory() {
                 @Override
                 public FnDataReceiver<FnOutT> create(String pCollectionId) {
                   return (receivedElement) -> {
                     // handover to queue, do not block the grpc thread
                     outputQueue.put(KV.of(pCollectionId, receivedElement));
                   };
                 }
               };
           final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
           final TimerReceiverFactory timerReceiverFactory =
               new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
   ```
   
   Should all be in the constructor?
   
   I think the philosophy here is to keep the constructor assign only, and not contain any computation.
   Code in the constructor should only look like
   
   ```
   this.a = a;
   this.b = b;
   ...
   ```
   
   etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083042856


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   By computation I mean lines that are not pure assignment.
   
   The constructor is usually not unit tested. If we add computation logic on to it, we would want to add a unit test onto it. Ending in something like
   
   ```
   class SdkHarnessDoFnRunner {
     @VisibleForTesting
     public Map<String, String> getTransformFullNameToUniqueName(){...}
   }
   
   class SdkHarnessDoFnRunnerTest {
     @Test
     public void testConstructor() {
        SdkHarnessDoFnRunner runner = new SdkHarnessDoFnRunner(...);
        assertEquals(runner.getTransformFullNameToUniqueName, expected);
     }
   }
   ```
   
   I'd rather not have these non-assignment lines inside the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083109458


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   I meant writing the `GeneralBundleProgressHandler` as said above (Or rename as `ChainedBundleProgressHandler`), then you can chain them up by
   
   ```
   BundleProgressHandler bundleProgressHandler = new ChainedBundleProgressHandler(List.of(samzaMetricsBundleProgressHandler, anotherBundleProgressHandler, yetAnotherBundleProgressHandler));
   
   RemoteBundle remoteBundle = getBundle(..., bundleProgressHandler);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083021175


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   I don't agree. `MonitoringInfo` is an API defined as part of portability and isn't tied to worker/runner.
   the object mutated is not an object belonging to this class (`BundleProgressHandler`). The handle is fetched from `MetricsContainer` which can change its own code and that percolates all the way to this class.
   
   If you were to have a contract of `updateMetrics(monitoringInfo)` exposed by `MetricsContainer` this class doesn't have to deal with the evolution of this logic and stays core to its purpose of just signaling components to do the right thing instead of owning what that right thing is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083033002


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   We can write something like
   ```
   private List<BundleProgressHandler> handlers;
   //Constructor
   public GeneralBundleProgressHandler(List<BundleProgressHandler> handlers){
     this.handlers = handlers;
   }
   
   public void handle(ProcessBundleProgressResponse progress) {
     this.handlers.forEach(handler -> handler.handle(progress))
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082898587


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -388,6 +407,7 @@ private void emitMetrics() {
       final long finishBundleTime = System.nanoTime();
       final long averageProcessTime = (finishBundleTime - startBundleTime) / count;
 
+      String metricName = "ExecutableStage-" + stepName + "-process-ns";

Review Comment:
   can be extracted to an instance variable since this doesn't seem to change after construction?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {

Review Comment:
   What are the guarantees on when `onCompleted` will be invoked? Are we guaranteed to be invoked regardless of bundle success/failure?
   
   What happens if the bundle is stuck? Should we detect them in which case `onProgress` needs to implemented as well.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;

Review Comment:
   why not invert the condition? intermittent control flow makes code hard to read.
   
   even otherwise, i think this should be `continue` instead of return.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   this looks like a responsibility of `MetricsContainerImpl`. I'd move this within `SamzaMetricsContainer`



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   suggest removing Metrics from the name since this isn't tied to metrics alone rather an implementation of `BundleProgressHandler` which can expand beyond metrics handling.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());

Review Comment:
   `monitoringInfo.getLabelsMap().toString()` doesn't seem like useful default. It depends on `toString()` implementation which may potentially change result in change in metric name.
   
   Consider having deterministic way of chaining the labels.
   
   Also, what happens if new labels get added. Would that void the old metric?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   Redundant computation. Why not store this map instead of the `Collection<PipelineNode.PTransformNode>` and compute this mapping once during construction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #25068: Samza udf metrics

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083139422


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   portability or not, one can argue the responsibility is metrics handling and hence the logic should stay within SamzaMetricsContainer. Especially most of what the code does is pertained to metrics container and its internals.
   
   IMO, the above code violates closed principle and leads to interactions with the component internals that `ProgressHandler` depends on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082950162


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083052029


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   I like delegating to multiple components. It's also a possible solution in the future. It's an overkill now since we only have 1 component.
   
   I think the naming with Metric will prevent us from writing all code in 1 handler in the future, and make the developer think about either chaining or using components, whichever way we go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083042856


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -316,12 +322,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> pTransformNode.getTransform().getUniqueName()));

Review Comment:
   By computation I mean lines that are not pure assignment.
   
   The constructor is usually not unit tested. If we add computation logic on to it, we would want to add a unit test onto it. Ending in something like
   
   ```
   class SdkHarnessDoFnRunner {
     @VisibleForTesting
     public Map<String, String> getTransformFullNameToUniqueName(){...}
   }
   
   class SdkHarnessDoFnRunnerTest {
     @Test
     public voidTestConstructor() {
        SdkHarnessDoFnRunner runner = new SdkHarnessDoFnRunner(...);
        assertEquals(runner.getTransformFullNameToUniqueName, expected);
     }
   }
   ```
   
   I'd rather not have extra stuff inside the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082944987


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());

Review Comment:
   This default behavior is copying the Flink implementation.  The monitoring infos look like this, and we don't have much to work with. Do you have any suggestions on constructing a metric name, when the `NAME` label doesn't exist?
   
   ```
   monitoring_infos {
     urn: "beam:metric:user:sum_int64:v1"
     type: "beam:metrics:sum_int64:v1"
     payload: "\n"
     labels {
       key: "NAME"
       value: "count101"
     }
     labels {
       key: "NAMESPACE"
       value: "org.apache.beam.runners.samza.portable.SamzaPortableTest"
     }
     labels {
       key: "PTRANSFORM"
       value: "Kati-Step-2-ParMultiDo-Anonymous-"
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082962347


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -388,6 +407,7 @@ private void emitMetrics() {
       final long finishBundleTime = System.nanoTime();
       final long averageProcessTime = (finishBundleTime - startBundleTime) / count;
 
+      String metricName = "ExecutableStage-" + stepName + "-process-ns";

Review Comment:
   I prefer not to use instance variables. We already have a large pool of it, and would like to keep only primitive data in there.
   
   On the same note, I want to keep data as local as possible. This `metricName` seems only useful for the emitMetrics method at the moment (it's specific to Executable stage time) , and may conflict with other metric names.
   
   I think putting the computation here improves the code cleanness, and out weights the cost to compute it every time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082962347


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -388,6 +407,7 @@ private void emitMetrics() {
       final long finishBundleTime = System.nanoTime();
       final long averageProcessTime = (finishBundleTime - startBundleTime) / count;
 
+      String metricName = "ExecutableStage-" + stepName + "-process-ns";

Review Comment:
   I don't like the use of instance variables. We already have a large pool of it, and would like to keep only primitive data in there.
   
   On the same note, I want to keep data as local as possible. This `metricName` seems only useful for the emitMetrics method at the moment (it's specific to Executable stage time) , and may conflict with other metric names.
   
   I think putting the computation here improves the code cleanness, and out weights the cost to compute it every time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082979217


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;

Review Comment:
   Will move the code a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1082990633


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) {
+      if (monitoringInfo.getPayload().isEmpty()) {
+        return;
+      }
+
+      String pTransformId =
+          monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
+      String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+      String className =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+      String userMetricName =
+          monitoringInfo.getLabelsOrDefault(
+              MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
+
+      MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
+      MetricName metricName = MetricName.named(className, userMetricName);
+
+      switch (monitoringInfo.getType()) {
+        case SUM_INT64_TYPE:
+          Counter counter = metricsContainer.getCounter(metricName);
+          counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+          break;
+
+        case DISTRIBUTION_INT64_TYPE:
+          Distribution distribution = metricsContainer.getDistribution(metricName);
+          DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
+          distribution.update(data.sum(), data.count(), data.min(), data.max());
+          break;
+
+        case LATEST_INT64_TYPE:
+          Gauge gauge = metricsContainer.getGauge(metricName);
+          // Gauge doesn't expose update as public. This will reset the timestamp.
+
+          gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+          break;
+
+        default:
+          LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+      }

Review Comment:
   `MonitoringInfo` is the data object used between the sdk worker and the runner; DistributionData/integer metric values are the data objected used within the runner.
   
   I think it is the `BundleProgressHandler` to translate the external data type to internal data types, and pass it to the internal services.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #25068: Samza udf metrics

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #25068:
URL: https://github.com/apache/beam/pull/25068#discussion_r1083147998


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide a mapping from
+   * transformId back to uniqueName. A future improvement would be making the monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(

Review Comment:
   I think keeping Metric in the class name is a stronger name to push for refactoring, if we ever want to handler to do more stuff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org