You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/04/07 00:51:47 UTC

[beam] branch master updated: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bc44a01  [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
     new 2a0b3f7  Merge pull request #11325 from lukecwik/beam4374
bc44a01 is described below

commit bc44a01dedd07c69223b7015ab90d4c09b24289e
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Apr 6 12:48:58 2020 -0700

    [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
    
    This completes the migration off of the Metrics proto to the MonitoringInfo proto.
---
 .../fn-execution/src/main/proto/beam_fn_api.proto  |  135 +--
 .../runners/core/metrics/MetricsTranslation.java   |  138 ---
 .../core/metrics/MetricsTranslationTest.java       |  188 ----
 .../worker/fn/control/BeamFnMapTaskExecutor.java   |   51 +-
 .../control/RegisterAndProcessBundleOperation.java |   23 +-
 .../fn/control/BeamFnMapTaskExecutorTest.java      |  343 ------
 .../RegisterAndProcessBundleOperationTest.java     |  196 ----
 .../SingularProcessBundleProgressTrackerTest.java  |    4 +-
 sdks/go/pkg/beam/core/runtime/exec/pardo.go        |    4 +-
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |   12 +-
 .../go/pkg/beam/core/runtime/harness/monitoring.go |   84 +-
 .../beam/model/fnexecution_v1/beam_fn_api.pb.go    | 1127 ++++----------------
 sdks/python/apache_beam/metrics/cells.py           |   39 -
 sdks/python/apache_beam/metrics/metricbase.py      |   11 -
 .../runners/portability/fn_api_runner/fn_runner.py |    8 +-
 .../portability/fn_api_runner/fn_runner_test.py    |   34 -
 .../apache_beam/runners/worker/bundle_processor.py |   46 -
 .../apache_beam/runners/worker/operations.py       |   44 -
 .../apache_beam/runners/worker/sdk_worker.py       |    1 -
 19 files changed, 248 insertions(+), 2240 deletions(-)

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 68f2ceb..974bf06 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -253,18 +253,13 @@ message ProcessBundleRequest {
 }
 
 message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  // DEPRECATED
-  Metrics metrics = 1;
-
   // (Optional) Specifies that the bundle has not been completed and the
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
   repeated DelayedBundleApplication residual_roots = 2;
 
-  // (Required) The list of metrics or other MonitoredState
+  // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
@@ -285,6 +280,8 @@ message ProcessBundleResponse {
   // as the MonitoringInfo could be reconstructed fully by overwriting its
   // payload field with the bytes specified here.
   map<string, bytes> monitoring_data = 5;
+
+  reserved 1;
 }
 
 // A request to report progress information for a given bundle.
@@ -311,135 +308,11 @@ message ProcessBundleProgressMetadataRequest {
   repeated string monitoring_info_id = 1;
 }
 
-// DEPRECATED
-message Metrics {
-  // PTransform level metrics.
-  // These metrics are split into processed and active element groups for
-  // progress reporting purposes. This allows a Runner to see what is measured,
-  // what is estimated and what can be extrapolated to be able to accurately
-  // estimate the amount of remaining work.
-  message PTransform {
-    // Metrics that are measured for processed and active element groups.
-    message Measured {
-      // (Optional) Map from local input name to number of elements processed
-      // from this input.
-      // If unset, assumed to be the sum of the outputs of all producers to
-      // this transform (for ProcessedElements) and 0 (for ActiveElements).
-      map<string, int64> input_element_counts = 1;
-
-      // (Required) Map from local output name to number of elements produced
-      // for this output.
-      map<string, int64> output_element_counts = 2;
-
-      // (Optional) The total time spent so far in processing the elements in
-      // this group, in seconds.
-      double total_time_spent = 3;
-
-      // TODO: Add other element group level metrics.
-    }
-
-    // Metrics for fully processed elements.
-    message ProcessedElements {
-      // (Required)
-      Measured measured = 1;
-    }
-
-    // Metrics for active elements.
-    // An element is considered active if the SDK has started but not finished
-    // processing it yet.
-    message ActiveElements {
-      // (Required)
-      Measured measured = 1;
-
-      // Estimated metrics.
-
-      // (Optional) Sum of estimated fraction of known work remaining for all
-      // active elements, as reported by this transform.
-      // If not reported, a Runner could extrapolate this from the processed
-      // elements.
-      // TODO: Handle the case when known work is infinite.
-      double fraction_remaining = 2;
-
-      // (Optional) Map from local output name to sum of estimated number
-      // of elements remaining for this output from all active elements,
-      // as reported by this transform.
-      // If not reported, a Runner could extrapolate this from the processed
-      // elements.
-      map<string, int64> output_elements_remaining = 3;
-    }
-
-    // (Required): Metrics for processed elements.
-    ProcessedElements processed_elements = 1;
-    // (Required): Metrics for active elements.
-    ActiveElements active_elements = 2;
-
-    // (Optional): Map from local output name to its watermark.
-    // The watermarks reported are tentative, to get a better sense of progress
-    // while processing a bundle but before it is committed. At bundle commit
-    // time, a Runner needs to also take into account the timers set to compute
-    // the actual watermarks.
-    map<string, int64> watermarks = 3;
-
-    repeated User user = 4;
-    // TODO: Define other transform level system metrics.
-  }
-
-  // User defined metrics
-  message User {
-    // A key for identifying a metric at the most granular level.
-    message MetricName {
-      // (Required): The namespace of this metric.
-      string namespace = 2;
-
-      // (Required): The name of this metric.
-      string name = 3;
-    }
-
-    // Data associated with a counter metric.
-    message CounterData {
-      int64 value = 1;
-    }
-
-    // Data associated with a distribution metric.
-    message DistributionData {
-      int64 count = 1;
-      int64 sum = 2;
-      int64 min = 3;
-      int64 max = 4;
-    }
-
-    // Data associated with a Gauge metric.
-    message GaugeData {
-      int64 value = 1;
-      google.protobuf.Timestamp timestamp = 2;
-    }
-
-    // (Required) The identifier for this metric.
-    MetricName metric_name = 1;
-
-    // (Required) The data for this metric.
-    oneof data {
-      CounterData counter_data = 1001;
-      DistributionData distribution_data = 1002;
-      GaugeData gauge_data = 1003;
-    }
-  }
-
-  map<string, PTransform> ptransforms = 1;
-}
-
 message ProcessBundleProgressResponse {
-  // DEPRECATED (Required)
-  Metrics metrics = 1;
-
   // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
-  // The list of currently active primary roots that are being
-  // executed. Required to be populated for PTransforms which can be split.
-  repeated BundleApplication primary_roots = 4;
-
   // An identifier to MonitoringInfo.payload mapping.
   //
   // An SDK can report metrics using an identifier that only contains the
@@ -451,6 +324,8 @@ message ProcessBundleProgressResponse {
   // as the MonitoringInfo could be reconstructed fully by overwriting its
   // payload field with the bytes specified here.
   map<string, bytes> monitoring_data = 5;
+
+  reserved 1, 2, 4;
 }
 
 // A response that contains the full mapping information associated with
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsTranslation.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsTranslation.java
deleted file mode 100644
index 6626167..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsTranslation.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.metrics;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.sdk.metrics.MetricKey;
-import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
-
-/** Translation utilities for metrics classes to/from Fn API. */
-public abstract class MetricsTranslation {
-
-  private MetricsTranslation() {}
-
-  public static MetricUpdates metricUpdatesFromProto(
-      String ptransformName, Collection<BeamFnApi.Metrics.User> userMetricUpdates) {
-    List<MetricUpdates.MetricUpdate<Long>> counterUpdates = new ArrayList<>();
-    List<MetricUpdates.MetricUpdate<DistributionData>> distributionUpdates = new ArrayList<>();
-    List<MetricUpdates.MetricUpdate<GaugeData>> gaugeUpdates = new ArrayList<>();
-
-    for (BeamFnApi.Metrics.User userMetricUpdate : userMetricUpdates) {
-      MetricKey metricKey =
-          MetricKey.create(ptransformName, metricNameFromProto(userMetricUpdate.getMetricName()));
-      switch (userMetricUpdate.getDataCase()) {
-        case COUNTER_DATA:
-          counterUpdates.add(
-              MetricUpdates.MetricUpdate.create(
-                  metricKey, userMetricUpdate.getCounterData().getValue()));
-          break;
-        case DISTRIBUTION_DATA:
-          distributionUpdates.add(
-              MetricUpdates.MetricUpdate.create(
-                  metricKey,
-                  DistributionData.create(
-                      userMetricUpdate.getDistributionData().getSum(),
-                      userMetricUpdate.getDistributionData().getCount(),
-                      userMetricUpdate.getDistributionData().getMin(),
-                      userMetricUpdate.getDistributionData().getMax())));
-          break;
-        case GAUGE_DATA:
-          gaugeUpdates.add(
-              MetricUpdates.MetricUpdate.create(
-                  metricKey, GaugeData.create(userMetricUpdate.getGaugeData().getValue())));
-          break;
-        case DATA_NOT_SET:
-          continue;
-      }
-    }
-    return MetricUpdates.create(counterUpdates, distributionUpdates, gaugeUpdates);
-  }
-
-  public static Map<String, Collection<BeamFnApi.Metrics.User>> metricUpdatesToProto(
-      MetricUpdates metricUpdates) {
-    LoadingCache<String, Collection<BeamFnApi.Metrics.User>> fnMetrics =
-        CacheBuilder.newBuilder()
-            .build(
-                new CacheLoader<String, Collection<BeamFnApi.Metrics.User>>() {
-                  @Override
-                  public Collection<BeamFnApi.Metrics.User> load(String ptransformName) {
-                    return new ArrayList<>();
-                  }
-                });
-
-    for (MetricUpdates.MetricUpdate<Long> counterUpdate : metricUpdates.counterUpdates()) {
-      fnMetrics
-          .getUnchecked(counterUpdate.getKey().stepName())
-          .add(
-              BeamFnApi.Metrics.User.newBuilder()
-                  .setMetricName(metricNameToProto(counterUpdate.getKey().metricName()))
-                  .setCounterData(
-                      BeamFnApi.Metrics.User.CounterData.newBuilder()
-                          .setValue(counterUpdate.getUpdate()))
-                  .build());
-    }
-
-    for (MetricUpdates.MetricUpdate<GaugeData> gaugeUpdate : metricUpdates.gaugeUpdates()) {
-      fnMetrics
-          .getUnchecked(gaugeUpdate.getKey().stepName())
-          .add(
-              BeamFnApi.Metrics.User.newBuilder()
-                  .setMetricName(metricNameToProto(gaugeUpdate.getKey().metricName()))
-                  .setGaugeData(
-                      BeamFnApi.Metrics.User.GaugeData.newBuilder()
-                          .setValue(gaugeUpdate.getUpdate().value()))
-                  .build());
-    }
-
-    for (MetricUpdates.MetricUpdate<DistributionData> distributionUpdate :
-        metricUpdates.distributionUpdates()) {
-      fnMetrics
-          .getUnchecked(distributionUpdate.getKey().stepName())
-          .add(
-              BeamFnApi.Metrics.User.newBuilder()
-                  .setMetricName(metricNameToProto(distributionUpdate.getKey().metricName()))
-                  .setDistributionData(
-                      BeamFnApi.Metrics.User.DistributionData.newBuilder()
-                          .setCount(distributionUpdate.getUpdate().count())
-                          .setMax(distributionUpdate.getUpdate().max())
-                          .setMin(distributionUpdate.getUpdate().min())
-                          .setSum(distributionUpdate.getUpdate().sum()))
-                  .build());
-    }
-
-    return fnMetrics.asMap();
-  }
-
-  public static MetricName metricNameFromProto(BeamFnApi.Metrics.User.MetricName protoMetricName) {
-    return MetricName.named(protoMetricName.getNamespace(), protoMetricName.getName());
-  }
-
-  public static BeamFnApi.Metrics.User.MetricName metricNameToProto(MetricName metricName) {
-    return BeamFnApi.Metrics.User.MetricName.newBuilder()
-        .setNamespace(metricName.getNamespace())
-        .setName(metricName.getName())
-        .build();
-  }
-}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsTranslationTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsTranslationTest.java
deleted file mode 100644
index 0e7c7f9..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsTranslationTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.metrics;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/** Tests for {@link MetricsTranslation}. */
-@RunWith(Parameterized.class)
-public class MetricsTranslationTest {
-
-  // Transform names are arbitrary user-meaningful steps in processing
-  private static final String TRANSFORM1 = "transform1";
-  private static final String TRANSFORM2 = "transform2";
-  private static final String TRANSFORM3 = "transform3";
-
-  // Namespaces correspond to different contexts for a metric
-  private static final String NAMESPACE1 = "fakeNamespace1";
-  private static final String NAMESPACE2 = "fakeNamespace2";
-
-  // Names are what is being measured
-  private static final String COUNTER_NAME1 = "elements";
-  private static final String COUNTER_NAME2 = "dropped";
-  private static final String DISTRIBUTION_NAME1 = "someMillis";
-  private static final String DISTRIBUTION_NAME2 = "otherMillis";
-  private static final String GAUGE_NAME1 = "load";
-  private static final String GAUGE_NAME2 = "memory";
-
-  private static final BeamFnApi.Metrics.User.MetricName COUNTER_METRIC1 =
-      BeamFnApi.Metrics.User.MetricName.newBuilder()
-          .setNamespace(NAMESPACE1)
-          .setName(COUNTER_NAME1)
-          .build();
-
-  private static final BeamFnApi.Metrics.User.MetricName COUNTER_METRIC2 =
-      BeamFnApi.Metrics.User.MetricName.newBuilder()
-          .setNamespace(NAMESPACE1)
-          .setName(COUNTER_NAME2)
-          .build();
-
-  private static final BeamFnApi.Metrics.User.MetricName DISTRIBUTION_METRIC1 =
-      BeamFnApi.Metrics.User.MetricName.newBuilder()
-          .setNamespace(NAMESPACE2)
-          .setName(DISTRIBUTION_NAME1)
-          .build();
-
-  private static final BeamFnApi.Metrics.User.MetricName DISTRIBUTION_METRIC2 =
-      BeamFnApi.Metrics.User.MetricName.newBuilder()
-          .setNamespace(NAMESPACE2)
-          .setName(DISTRIBUTION_NAME2)
-          .build();
-
-  private static final BeamFnApi.Metrics.User.MetricName GAUGE_METRIC1 =
-      BeamFnApi.Metrics.User.MetricName.newBuilder()
-          .setNamespace(NAMESPACE1)
-          .setName(GAUGE_NAME1)
-          .build();
-
-  private static final BeamFnApi.Metrics.User.MetricName GAUGE_METRIC2 =
-      BeamFnApi.Metrics.User.MetricName.newBuilder()
-          .setNamespace(NAMESPACE2)
-          .setName(GAUGE_NAME2)
-          .build();
-
-  private static final BeamFnApi.Metrics.User DISTRIBUTION1 =
-      BeamFnApi.Metrics.User.newBuilder()
-          .setMetricName(DISTRIBUTION_METRIC1)
-          .setDistributionData(
-              BeamFnApi.Metrics.User.DistributionData.newBuilder()
-                  .setCount(42)
-                  .setSum(4839L)
-                  .setMax(348L)
-                  .setMin(12L))
-          .build();
-
-  private static final BeamFnApi.Metrics.User DISTRIBUTION2 =
-      BeamFnApi.Metrics.User.newBuilder()
-          .setMetricName(DISTRIBUTION_METRIC2)
-          .setDistributionData(
-              BeamFnApi.Metrics.User.DistributionData.newBuilder()
-                  .setCount(3)
-                  .setSum(49L)
-                  .setMax(43L)
-                  .setMin(1L))
-          .build();
-
-  private static final BeamFnApi.Metrics.User COUNTER1 =
-      BeamFnApi.Metrics.User.newBuilder()
-          .setMetricName(COUNTER_METRIC1)
-          .setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(92L))
-          .build();
-
-  private static final BeamFnApi.Metrics.User COUNTER2 =
-      BeamFnApi.Metrics.User.newBuilder()
-          .setMetricName(COUNTER_METRIC2)
-          .setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(0L))
-          .build();
-
-  private static final BeamFnApi.Metrics.User GAUGE1 =
-      BeamFnApi.Metrics.User.newBuilder()
-          .setMetricName(GAUGE_METRIC2)
-          .setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(56L))
-          .build();
-
-  private static final BeamFnApi.Metrics.User GAUGE2 =
-      BeamFnApi.Metrics.User.newBuilder()
-          .setMetricName(GAUGE_METRIC2)
-          .setCounterData(BeamFnApi.Metrics.User.CounterData.newBuilder().setValue(3L))
-          .build();
-
-  @Parameterized.Parameters
-  public static Iterable<Object[]> testInstances() {
-    return ImmutableList.<Object[]>builder()
-        .add(
-            new Object[] {
-              ImmutableMap.builder().put(TRANSFORM1, ImmutableList.of(DISTRIBUTION1)).build()
-            })
-        .add(
-            new Object[] {
-              ImmutableMap.builder()
-                  .put(TRANSFORM1, ImmutableList.of(DISTRIBUTION1, COUNTER1))
-                  .build()
-            })
-        .add(
-            new Object[] {
-              ImmutableMap.builder()
-                  .put(TRANSFORM1, ImmutableList.of(DISTRIBUTION1, COUNTER1))
-                  .put(TRANSFORM2, ImmutableList.of(COUNTER2))
-                  .put(TRANSFORM3, ImmutableList.of(GAUGE1))
-                  .build()
-            })
-        .add(
-            new Object[] {
-              ImmutableMap.builder().put(TRANSFORM1, ImmutableList.of(GAUGE1, GAUGE2)).build()
-            })
-        .build();
-  }
-
-  @Parameterized.Parameter(0)
-  public Map<String, Collection<BeamFnApi.Metrics.User>> fnMetrics;
-
-  @Test
-  public void testToFromProtoMetricUpdates() {
-    ImmutableMap.Builder<String, Collection<BeamFnApi.Metrics.User>> result =
-        ImmutableMap.builder();
-
-    for (Map.Entry<String, Collection<BeamFnApi.Metrics.User>> entry : fnMetrics.entrySet()) {
-      MetricUpdates updates =
-          MetricsTranslation.metricUpdatesFromProto(entry.getKey(), entry.getValue());
-      result.putAll(MetricsTranslation.metricUpdatesToProto(updates));
-    }
-
-    Map<String, Collection<BeamFnApi.Metrics.User>> backToProto = result.build();
-
-    assertThat(backToProto.keySet(), equalTo(fnMetrics.keySet()));
-
-    for (String ptransformName : backToProto.keySet()) {
-      assertThat(
-          backToProto.get(ptransformName),
-          containsInAnyOrder(fnMetrics.get(ptransformName).toArray()));
-    }
-  }
-}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index ee41190..ae019c6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -42,7 +42,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.DistributionData;
@@ -50,7 +49,6 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.GaugeData;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
-import org.apache.beam.runners.core.metrics.MetricsTranslation;
 import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
@@ -341,30 +339,16 @@ public class BeamFnMapTaskExecutor extends DataflowMapTaskExecutor {
           grpcWriteOperation.abortWait();
         }
 
-        // TODO(BEAM-6189): Replace getProcessBundleProgress with getMonitoringInfos when Metrics
-        // is deprecated.
         ProcessBundleProgressResponse processBundleProgressResponse =
             MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
 
         final List<MonitoringInfo> monitoringInfosList =
             processBundleProgressResponse.getMonitoringInfosList();
 
-        // Supporting deprecated metrics until all supported runners are migrated to using
-        // MonitoringInfos
-        Metrics metrics = processBundleProgressResponse.getMetrics();
-        double elementsConsumed =
-            bundleProcessOperation.getInputElementsConsumed(monitoringInfosList);
-
-        if (elementsConsumed == 0) {
-          elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics);
-        }
-
         updateMetrics(monitoringInfosList);
-        updateMetricsDeprecated(metrics);
-
-        // todo(migryz): utilize monitoringInfos here.
-        // Requires Element Count metrics to be implemented.
 
+        double elementsConsumed =
+            bundleProcessOperation.getInputElementsConsumed(monitoringInfosList);
         grpcWriteOperationElementsProcessed.accept((int) elementsConsumed);
         progressInterpolator.addPoint(
             grpcWriteOperation.getElementsSent(), readOperation.getProgress());
@@ -430,36 +414,6 @@ public class BeamFnMapTaskExecutor extends DataflowMapTaskExecutor {
               .collect(Collectors.toList());
     }
 
-    /**
-     * Updates internal metrics from provided (deprecated) Metrics object.
-     *
-     * @param metrics Metrics object received from FnApi.
-     */
-    @Deprecated
-    private void updateMetricsDeprecated(Metrics metrics) {
-      metrics
-          .getPtransformsMap()
-          .entrySet()
-          .forEach(
-              ptransformEntry -> {
-                MetricUpdates ptransformMetricUpdates =
-                    MetricsTranslation.metricUpdatesFromProto(
-                        ptransformEntry.getKey(), ptransformEntry.getValue().getUserList());
-                for (MetricUpdate<Long> update : ptransformMetricUpdates.counterUpdates()) {
-                  deprecatedCounterUpdates.put(update.getKey(), update);
-                }
-
-                for (MetricUpdate<DistributionData> update :
-                    ptransformMetricUpdates.distributionUpdates()) {
-                  deprecatedDistributionUpdates.put(update.getKey(), update);
-                }
-
-                for (MetricUpdate<GaugeData> update : ptransformMetricUpdates.gaugeUpdates()) {
-                  deprecatedGaugeUpdates.put(update.getKey(), update);
-                }
-              });
-    }
-
     @Nullable
     @Override
     public Progress getWorkerProgress() throws Exception {
@@ -529,7 +483,6 @@ public class BeamFnMapTaskExecutor extends DataflowMapTaskExecutor {
       deprecatedGaugeUpdates.clear();
       try {
         updateMetrics(MoreFutures.get(bundleProcessOperation.getFinalMonitoringInfos()));
-        updateMetricsDeprecated(MoreFutures.get(bundleProcessOperation.getFinalMetrics()));
       } catch (ExecutionException | InterruptedException exn) {
         LOG.info("Failed to get final metrics for bundle", exn);
       }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 2fbe144..ff50de4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -364,11 +364,10 @@ public class RegisterAndProcessBundleOperation extends Operation {
   /**
    * Returns the compound metrics recorded, by issuing a request to the SDK harness.
    *
-   * <p>This includes key progress indicators in {@link BeamFnApi.Metrics.PTransform.Measured} as
-   * well as user-defined metrics in {@link BeamFnApi.Metrics.User}.
+   * <p>This includes key progress indicators as well as user-defined metrics.
    *
-   * <p>Use {@link #getInputElementsConsumed(BeamFnApi.Metrics)} on the future value to extract the
-   * elements consumed from the upstream read operation.
+   * <p>Use {@link #getInputElementsConsumed} on the future value to extract the elements consumed
+   * from the upstream read operation.
    *
    * <p>May be called at any time, including before start() and after finish().
    */
@@ -393,12 +392,6 @@ public class RegisterAndProcessBundleOperation extends Operation {
         .thenApply(InstructionResponse::getProcessBundleProgress);
   }
 
-  /** Returns the final metrics returned by the SDK harness when it completes the bundle. */
-  public CompletionStage<BeamFnApi.Metrics> getFinalMetrics() {
-    return getProcessBundleResponse(processBundleResponse)
-        .thenApply(response -> response.getMetrics());
-  }
-
   public CompletionStage<List<MonitoringInfo>> getFinalMonitoringInfos() {
     return getProcessBundleResponse(processBundleResponse)
         .thenApply(response -> response.getMonitoringInfosList());
@@ -455,16 +448,6 @@ public class RegisterAndProcessBundleOperation extends Operation {
     return 0;
   }
 
-  /** Returns the number of input elements consumed by the gRPC read, if known, otherwise 0. */
-  double getInputElementsConsumed(BeamFnApi.Metrics metrics) {
-    return metrics
-        .getPtransformsOrDefault(
-            grpcReadTransformId, BeamFnApi.Metrics.PTransform.getDefaultInstance())
-        .getProcessedElements()
-        .getMeasured()
-        .getOutputElementCountsOrDefault(grpcReadTransformOutputName, 0);
-  }
-
   private CompletionStage<BeamFnApi.StateResponse.Builder> delegateByStateKeyType(
       StateRequest stateRequest) {
     switch (stateRequest.getStateKey().getTypeCase()) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index d5020cb..4473311 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.dataflow.worker.fn.control;
 
 import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -37,7 +36,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.StateInternals;
@@ -96,13 +94,6 @@ public class BeamFnMapTaskExecutorTest {
   private static final String FAKE_OUTPUT_NAME = "fake_output_name";
   private static final String FAKE_OUTPUT_PCOLLECTION_ID = "fake_pcollection_id";
 
-  private static final Metrics.PTransform FAKE_ELEMENT_COUNT_METRICS =
-      Metrics.PTransform.newBuilder()
-          .setProcessedElements(
-              Metrics.PTransform.ProcessedElements.newBuilder()
-                  .setMeasured(Metrics.PTransform.Measured.getDefaultInstance()))
-          .build();
-
   private static final BeamFnApi.RegisterRequest REGISTER_REQUEST =
       BeamFnApi.RegisterRequest.newBuilder()
           .addProcessBundleDescriptor(
@@ -117,343 +108,11 @@ public class BeamFnMapTaskExecutorTest {
                           .build()))
           .build();
 
-  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
-  public void testTentativeUserMetrics() throws Exception {
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final int counterValue = 42;
-    final CountDownLatch progressSentLatch = new CountDownLatch(1);
-    final CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final Metrics.User.MetricName metricName =
-        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request).build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                progressSentLatch.countDown();
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    Metrics.newBuilder()
-                                        .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
-                                        .putPtransforms(
-                                            stepName,
-                                            Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            Metrics.User.CounterData.newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation processOperation =
-        new RegisterAndProcessBundleOperation(
-            IdGenerators.decrementingLongs(),
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    BeamFnMapTaskExecutor mapTaskExecutor =
-        BeamFnMapTaskExecutor.forOperations(
-            ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
-            executionStateTracker);
-
-    // Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
-    // tentative update
-    CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
-    progressSentLatch.await();
-
-    Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
-    while (Iterables.size(metricsCounterUpdates) == 0) {
-      Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
-      metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
-    }
-
-    assertThat(
-        metricsCounterUpdates,
-        contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(counterValue)));
-
-    // Now let it finish and clean up
-    processBundleLatch.countDown();
-    MoreFutures.get(doneFuture);
-  }
-
-  /** Tests that successive metric updates overwrite the previous. */
-  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
-  public void testTentativeUserMetricsOverwrite() throws Exception {
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final int firstCounterValue = 42;
-    final int secondCounterValue = 77;
-    final CountDownLatch progressSentTwiceLatch = new CountDownLatch(2);
-    final CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final Metrics.User.MetricName metricName =
-        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request).build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                progressSentTwiceLatch.countDown();
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    Metrics.newBuilder()
-                                        .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
-                                        .putPtransforms(
-                                            stepName,
-                                            Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            Metrics.User.CounterData.newBuilder()
-                                                                .setValue(
-                                                                    progressSentTwiceLatch
-                                                                                .getCount()
-                                                                            > 0
-                                                                        ? firstCounterValue
-                                                                        : secondCounterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    when(grpcPortWriteOperation.processedElementsConsumer()).thenReturn(elementsConsumed -> {});
-
-    RegisterAndProcessBundleOperation processOperation =
-        new RegisterAndProcessBundleOperation(
-            IdGenerators.decrementingLongs(),
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    BeamFnMapTaskExecutor mapTaskExecutor =
-        BeamFnMapTaskExecutor.forOperations(
-            ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
-            executionStateTracker);
-
-    // Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
-    // tentative update
-    CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
-    progressSentTwiceLatch.await();
-
-    Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
-    while (Iterables.size(metricsCounterUpdates) == 0) {
-      Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
-      metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
-    }
-
-    assertThat(
-        metricsCounterUpdates,
-        contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(secondCounterValue)));
-
-    // Now let it finish and clean up
-    processBundleLatch.countDown();
-    MoreFutures.get(doneFuture);
-  }
-
-  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
-  public void testFinalUserMetricsDeprecated() throws Exception {
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final int counterValue = 42;
-    final int finalCounterValue = 77;
-    final CountDownLatch progressSentLatch = new CountDownLatch(1);
-    final CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final Metrics.User.MetricName metricName =
-        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request)
-                          .setProcessBundle(
-                              BeamFnApi.ProcessBundleResponse.newBuilder()
-                                  .setMetrics(
-                                      Metrics.newBuilder()
-                                          .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
-                                          .putPtransforms(
-                                              stepName,
-                                              Metrics.PTransform.newBuilder()
-                                                  .addUser(
-                                                      Metrics.User.newBuilder()
-                                                          .setMetricName(metricName)
-                                                          .setCounterData(
-                                                              Metrics.User.CounterData.newBuilder()
-                                                                  .setValue(finalCounterValue)))
-                                                  .build())))
-                          .build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                progressSentLatch.countDown();
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    Metrics.newBuilder()
-                                        .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
-                                        .putPtransforms(
-                                            stepName,
-                                            Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            Metrics.User.CounterData.newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation processOperation =
-        new RegisterAndProcessBundleOperation(
-            IdGenerators.decrementingLongs(),
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    BeamFnMapTaskExecutor mapTaskExecutor =
-        BeamFnMapTaskExecutor.forOperations(
-            ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
-            executionStateTracker);
-
-    // Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
-    // tentative update
-    CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
-    progressSentLatch.await();
-
-    // TODO: add ability to wait for tentative progress update
-    Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
-    while (Iterables.size(metricsCounterUpdates) == 0) {
-      Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
-      metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
-    }
-
-    // Get the final metrics
-    processBundleLatch.countDown();
-    MoreFutures.get(doneFuture);
-    metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
-
-    assertThat(Iterables.size(metricsCounterUpdates), equalTo(1));
-
-    assertThat(
-        metricsCounterUpdates,
-        contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
-  }
-
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 60)
   public void testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent()
       throws Exception {
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final int counterValue = 42;
-    final int finalCounterValue = 77;
     final CountDownLatch progressSentLatch = new CountDownLatch(1);
     final CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final Metrics.User.MetricName metricName =
-        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
-
-    final Metrics deprecatedMetrics =
-        Metrics.newBuilder()
-            .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
-            .putPtransforms(
-                stepName,
-                Metrics.PTransform.newBuilder()
-                    .addUser(
-                        Metrics.User.newBuilder()
-                            .setMetricName(metricName)
-                            .setCounterData(
-                                Metrics.User.CounterData.newBuilder().setValue(finalCounterValue)))
-                    .build())
-            .build();
-
     final int expectedCounterValue = 5;
     final MonitoringInfo expectedMonitoringInfo =
         MonitoringInfo.newBuilder()
@@ -479,7 +138,6 @@ public class BeamFnMapTaskExecutorTest {
                       return responseFor(request)
                           .setProcessBundle(
                               BeamFnApi.ProcessBundleResponse.newBuilder()
-                                  .setMetrics(deprecatedMetrics)
                                   .addMonitoringInfos(expectedMonitoringInfo))
                           .build();
                     });
@@ -489,7 +147,6 @@ public class BeamFnMapTaskExecutorTest {
                     responseFor(request)
                         .setProcessBundleProgress(
                             BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(deprecatedMetrics)
                                 .addMonitoringInfos(expectedMonitoringInfo))
                         .build());
               default:
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index a89dac6..5305693 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -19,8 +19,6 @@ package org.apache.beam.runners.dataflow.worker.fn.control;
 
 import static org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.encodeAndConcat;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
@@ -243,200 +241,6 @@ public class RegisterAndProcessBundleOperationTest {
   }
 
   @Test
-  public void testTentativeUserMetrics() throws Exception {
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request).build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    assertThat(metrics.getPtransformsOrThrow(stepName).getUserCount(), equalTo(1));
-
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(counterValue));
-
-    processBundleLatch.countDown();
-    operation.finish();
-  }
-
-  @Test
-  public void testFinalUserMetrics() throws Exception {
-    List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-    ExecutorService executorService = Executors.newCachedThreadPool();
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-    final long finalCounterValue = 77;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request)
-                          .setProcessBundle(
-                              BeamFnApi.ProcessBundleResponse.newBuilder()
-                                  .setMetrics(
-                                      BeamFnApi.Metrics.newBuilder()
-                                          .putPtransforms(
-                                              stepName,
-                                              BeamFnApi.Metrics.PTransform.newBuilder()
-                                                  .addUser(
-                                                      BeamFnApi.Metrics.User.newBuilder()
-                                                          .setMetricName(metricName)
-                                                          .setCounterData(
-                                                              BeamFnApi.Metrics.User.CounterData
-                                                                  .newBuilder()
-                                                                  .setValue(finalCounterValue)))
-                                                  .build())))
-                          .build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    // Force some intermediate metrics to test crosstalk is not introduced
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), not(equalTo(finalCounterValue)));
-
-    processBundleLatch.countDown();
-    operation.finish();
-
-    metrics = MoreFutures.get(operation.getFinalMetrics());
-
-    userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(finalCounterValue));
-  }
-
-  @Test
   public void testProcessingBundleBlocksOnFinish() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
     IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
index b469829..0be1b9b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
@@ -83,8 +83,8 @@ public class SingularProcessBundleProgressTrackerTest {
             CompletableFuture.completedFuture(
                 BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance()));
 
-    when(process.getInputElementsConsumed(any(BeamFnApi.Metrics.class)))
-        .thenReturn(1.0, 4.0, 10.0)
+    when(process.getInputElementsConsumed(any(Iterable.class)))
+        .thenReturn(1L, 4L, 10L)
         .thenThrow(new RuntimeException());
 
     // Initially no progress is known.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 14f870f..02070dc 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -76,9 +76,9 @@ func (n *ParDo) Up(ctx context.Context) error {
 	n.inv = newInvoker(n.Fn.ProcessElementFn())
 
 	// We can't cache the context during Setup since it runs only once per bundle.
-	// Subsequent bundles might run this same node, and the context here would be 
+	// Subsequent bundles might run this same node, and the context here would be
 	// incorrectly refering to the older bundleId.
-	setupCtx :=  metrics.SetPTransformID(ctx, n.PID)
+	setupCtx := metrics.SetPTransformID(ctx, n.PID)
 	if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil {
 		return n.fail(err)
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 5b25fe7..03205e7 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -204,7 +204,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		data.Close()
 		state.Close()
 
-		mets, mons, pylds := monitoring(plan)
+		mons, pylds := monitoring(plan)
 		// Move the plan back to the candidate state
 		c.mu.Lock()
 		// Mark the instruction as failed.
@@ -223,9 +223,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 			InstructionId: string(instID),
 			Response: &fnpb.InstructionResponse_ProcessBundle{
 				ProcessBundle: &fnpb.ProcessBundleResponse{
-					MonitoringData: pylds,
-					// TODO(lostluck): Delete legacy monitoring Metrics once they can be safely dropped.
-					Metrics:         mets,
+					MonitoringData:  pylds,
 					MonitoringInfos: mons,
 				},
 			},
@@ -246,15 +244,13 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 			return fail(ctx, instID, "failed to return progress: instruction %v not active", ref)
 		}
 
-		mets, mons, pylds := monitoring(plan)
+		mons, pylds := monitoring(plan)
 
 		return &fnpb.InstructionResponse{
 			InstructionId: string(instID),
 			Response: &fnpb.InstructionResponse_ProcessBundleProgress{
 				ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
-					MonitoringData: pylds,
-					// TODO(lostluck): Delete legacy monitoring Metrics once they can be safely dropped.
-					Metrics:         mets,
+					MonitoringData:  pylds,
 					MonitoringInfos: mons,
 				},
 			},
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index fa0370e..df1093f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -26,9 +26,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
-	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	ppb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-	"github.com/golang/protobuf/ptypes"
 )
 
 type mUrn uint32
@@ -201,62 +199,15 @@ func shortIdsToInfos(shortids []string) map[string]*ppb.MonitoringInfo {
 	return defaultShortIDCache.shortIdsToInfos(shortids)
 }
 
-func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo, map[string][]byte) {
+func monitoring(p *exec.Plan) ([]*ppb.MonitoringInfo, map[string][]byte) {
 	store := p.Store()
 	if store == nil {
-		return nil, nil, nil
+		return nil, nil
 	}
 
-	// Get the legacy style metrics.
-	transforms := make(map[string]*fnpb.Metrics_PTransform)
-	metrics.Extractor{
-		SumInt64: func(l metrics.Labels, v int64) {
-			pb := getTransform(transforms, l)
-			pb.User = append(pb.User, &fnpb.Metrics_User{
-				MetricName: toName(l),
-				Data: &fnpb.Metrics_User_CounterData_{
-					CounterData: &fnpb.Metrics_User_CounterData{
-						Value: v,
-					},
-				},
-			})
-		},
-		DistributionInt64: func(l metrics.Labels, count, sum, min, max int64) {
-			pb := getTransform(transforms, l)
-			pb.User = append(pb.User, &fnpb.Metrics_User{
-				MetricName: toName(l),
-				Data: &fnpb.Metrics_User_DistributionData_{
-					DistributionData: &fnpb.Metrics_User_DistributionData{
-						Count: count,
-						Sum:   sum,
-						Min:   min,
-						Max:   max,
-					},
-				},
-			})
-		},
-		GaugeInt64: func(l metrics.Labels, v int64, t time.Time) {
-			ts, err := ptypes.TimestampProto(t)
-			if err != nil {
-				panic(err)
-			}
-			pb := getTransform(transforms, l)
-			pb.User = append(pb.User, &fnpb.Metrics_User{
-				MetricName: toName(l),
-				Data: &fnpb.Metrics_User_GaugeData_{
-					GaugeData: &fnpb.Metrics_User_GaugeData{
-						Value:     v,
-						Timestamp: ts,
-					},
-				},
-			})
-		},
-	}.ExtractFrom(store)
-
 	defaultShortIDCache.mu.Lock()
 	defer defaultShortIDCache.mu.Unlock()
 
-	// Get the MonitoringInfo versions.
 	var monitoringInfo []*ppb.MonitoringInfo
 	payloads := make(map[string][]byte)
 	metrics.Extractor{
@@ -310,17 +261,6 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo, map[string]
 
 	// Get the execution monitoring information from the bundle plan.
 	if snapshot, ok := p.Progress(); ok {
-		// Legacy version.
-		transforms[snapshot.ID] = &fnpb.Metrics_PTransform{
-			ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{
-				Measured: &fnpb.Metrics_PTransform_Measured{
-					OutputElementCounts: map[string]int64{
-						snapshot.Name: snapshot.Count,
-					},
-				},
-			},
-		}
-		// Monitoring info version.
 		payload, err := int64Counter(snapshot.Count)
 		if err != nil {
 			panic(err)
@@ -338,9 +278,7 @@ func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo, map[string]
 			})
 	}
 
-	return &fnpb.Metrics{
-			Ptransforms: transforms,
-		}, monitoringInfo,
+	return monitoringInfo,
 		payloads
 }
 
@@ -387,19 +325,3 @@ func int64Distribution(count, sum, min, max int64) ([]byte, error) {
 	}
 	return buf.Bytes(), nil
 }
-
-func getTransform(transforms map[string]*fnpb.Metrics_PTransform, l metrics.Labels) *fnpb.Metrics_PTransform {
-	if pb, ok := transforms[l.Transform()]; ok {
-		return pb
-	}
-	pb := &fnpb.Metrics_PTransform{}
-	transforms[l.Transform()] = pb
-	return pb
-}
-
-func toName(l metrics.Labels) *fnpb.Metrics_User_MetricName {
-	return &fnpb.Metrics_User_MetricName{
-		Name:      l.Name(),
-		Namespace: l.Namespace(),
-	}
-}
diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
index bc8c269..9f902aa 100644
--- a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
+++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
@@ -78,7 +78,7 @@ func (x LogEntry_Severity_Enum) String() string {
 }
 
 func (LogEntry_Severity_Enum) EnumDescriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{29, 1, 0}
+	return fileDescriptor_6d954c03a4758710, []int{28, 1, 0}
 }
 
 // A descriptor for connecting to a remote port using the Beam Fn Data API.
@@ -1017,16 +1017,12 @@ func (m *ProcessBundleRequest_CacheToken_SideInput) GetSideInputId() string {
 }
 
 type ProcessBundleResponse struct {
-	// (Optional) If metrics reporting is supported by the SDK, this represents
-	// the final metrics to record for this bundle.
-	// DEPRECATED
-	Metrics *Metrics `protobuf:"bytes,1,opt,name=metrics,proto3" json:"metrics,omitempty"`
 	// (Optional) Specifies that the bundle has not been completed and the
 	// following applications need to be scheduled and executed in the future.
 	// A runner that does not yet support residual roots MUST still check that
 	// this is empty for correctness.
 	ResidualRoots []*DelayedBundleApplication `protobuf:"bytes,2,rep,name=residual_roots,json=residualRoots,proto3" json:"residual_roots,omitempty"`
-	// (Required) The list of metrics or other MonitoredState
+	// DEPRECATED (Required) The list of metrics or other MonitoredState
 	// collected while processing this bundle.
 	MonitoringInfos []*pipeline_v1.MonitoringInfo `protobuf:"bytes,3,rep,name=monitoring_infos,json=monitoringInfos,proto3" json:"monitoring_infos,omitempty"`
 	// (Optional) Specifies that the runner must callback to this worker
@@ -1075,13 +1071,6 @@ func (m *ProcessBundleResponse) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_ProcessBundleResponse proto.InternalMessageInfo
 
-func (m *ProcessBundleResponse) GetMetrics() *Metrics {
-	if m != nil {
-		return m.Metrics
-	}
-	return nil
-}
-
 func (m *ProcessBundleResponse) GetResidualRoots() []*DelayedBundleApplication {
 	if m != nil {
 		return m.ResidualRoots
@@ -1204,613 +1193,10 @@ func (m *ProcessBundleProgressMetadataRequest) GetMonitoringInfoId() []string {
 	return nil
 }
 
-// DEPRECATED
-type Metrics struct {
-	Ptransforms          map[string]*Metrics_PTransform `protobuf:"bytes,1,rep,name=ptransforms,proto3" json:"ptransforms,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
-	XXX_NoUnkeyedLiteral struct{}                       `json:"-"`
-	XXX_unrecognized     []byte                         `json:"-"`
-	XXX_sizecache        int32                          `json:"-"`
-}
-
-func (m *Metrics) Reset()         { *m = Metrics{} }
-func (m *Metrics) String() string { return proto.CompactTextString(m) }
-func (*Metrics) ProtoMessage()    {}
-func (*Metrics) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12}
-}
-
-func (m *Metrics) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics.Unmarshal(m, b)
-}
-func (m *Metrics) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics.Marshal(b, m, deterministic)
-}
-func (m *Metrics) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics.Merge(m, src)
-}
-func (m *Metrics) XXX_Size() int {
-	return xxx_messageInfo_Metrics.Size(m)
-}
-func (m *Metrics) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics proto.InternalMessageInfo
-
-func (m *Metrics) GetPtransforms() map[string]*Metrics_PTransform {
-	if m != nil {
-		return m.Ptransforms
-	}
-	return nil
-}
-
-// PTransform level metrics.
-// These metrics are split into processed and active element groups for
-// progress reporting purposes. This allows a Runner to see what is measured,
-// what is estimated and what can be extrapolated to be able to accurately
-// estimate the amount of remaining work.
-type Metrics_PTransform struct {
-	// (Required): Metrics for processed elements.
-	ProcessedElements *Metrics_PTransform_ProcessedElements `protobuf:"bytes,1,opt,name=processed_elements,json=processedElements,proto3" json:"processed_elements,omitempty"`
-	// (Required): Metrics for active elements.
-	ActiveElements *Metrics_PTransform_ActiveElements `protobuf:"bytes,2,opt,name=active_elements,json=activeElements,proto3" json:"active_elements,omitempty"`
-	// (Optional): Map from local output name to its watermark.
-	// The watermarks reported are tentative, to get a better sense of progress
-	// while processing a bundle but before it is committed. At bundle commit
-	// time, a Runner needs to also take into account the timers set to compute
-	// the actual watermarks.
-	Watermarks           map[string]int64 `protobuf:"bytes,3,rep,name=watermarks,proto3" json:"watermarks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
-	User                 []*Metrics_User  `protobuf:"bytes,4,rep,name=user,proto3" json:"user,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}         `json:"-"`
-	XXX_unrecognized     []byte           `json:"-"`
-	XXX_sizecache        int32            `json:"-"`
-}
-
-func (m *Metrics_PTransform) Reset()         { *m = Metrics_PTransform{} }
-func (m *Metrics_PTransform) String() string { return proto.CompactTextString(m) }
-func (*Metrics_PTransform) ProtoMessage()    {}
-func (*Metrics_PTransform) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 0}
-}
-
-func (m *Metrics_PTransform) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_PTransform.Unmarshal(m, b)
-}
-func (m *Metrics_PTransform) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_PTransform.Marshal(b, m, deterministic)
-}
-func (m *Metrics_PTransform) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_PTransform.Merge(m, src)
-}
-func (m *Metrics_PTransform) XXX_Size() int {
-	return xxx_messageInfo_Metrics_PTransform.Size(m)
-}
-func (m *Metrics_PTransform) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_PTransform.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_PTransform proto.InternalMessageInfo
-
-func (m *Metrics_PTransform) GetProcessedElements() *Metrics_PTransform_ProcessedElements {
-	if m != nil {
-		return m.ProcessedElements
-	}
-	return nil
-}
-
-func (m *Metrics_PTransform) GetActiveElements() *Metrics_PTransform_ActiveElements {
-	if m != nil {
-		return m.ActiveElements
-	}
-	return nil
-}
-
-func (m *Metrics_PTransform) GetWatermarks() map[string]int64 {
-	if m != nil {
-		return m.Watermarks
-	}
-	return nil
-}
-
-func (m *Metrics_PTransform) GetUser() []*Metrics_User {
-	if m != nil {
-		return m.User
-	}
-	return nil
-}
-
-// Metrics that are measured for processed and active element groups.
-type Metrics_PTransform_Measured struct {
-	// (Optional) Map from local input name to number of elements processed
-	// from this input.
-	// If unset, assumed to be the sum of the outputs of all producers to
-	// this transform (for ProcessedElements) and 0 (for ActiveElements).
-	InputElementCounts map[string]int64 `protobuf:"bytes,1,rep,name=input_element_counts,json=inputElementCounts,proto3" json:"input_element_counts,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
-	// (Required) Map from local output name to number of elements produced
-	// for this output.
-	OutputElementCounts map[string]int64 `protobuf:"bytes,2,rep,name=output_element_counts,json=outputElementCounts,proto3" json:"output_element_counts,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
-	// (Optional) The total time spent so far in processing the elements in
-	// this group, in seconds.
-	TotalTimeSpent       float64  `protobuf:"fixed64,3,opt,name=total_time_spent,json=totalTimeSpent,proto3" json:"total_time_spent,omitempty"`
-	XXX_NoUnkeyedLiteral struct{} `json:"-"`
-	XXX_unrecognized     []byte   `json:"-"`
-	XXX_sizecache        int32    `json:"-"`
-}
-
-func (m *Metrics_PTransform_Measured) Reset()         { *m = Metrics_PTransform_Measured{} }
-func (m *Metrics_PTransform_Measured) String() string { return proto.CompactTextString(m) }
-func (*Metrics_PTransform_Measured) ProtoMessage()    {}
-func (*Metrics_PTransform_Measured) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 0, 0}
-}
-
-func (m *Metrics_PTransform_Measured) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_PTransform_Measured.Unmarshal(m, b)
-}
-func (m *Metrics_PTransform_Measured) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_PTransform_Measured.Marshal(b, m, deterministic)
-}
-func (m *Metrics_PTransform_Measured) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_PTransform_Measured.Merge(m, src)
-}
-func (m *Metrics_PTransform_Measured) XXX_Size() int {
-	return xxx_messageInfo_Metrics_PTransform_Measured.Size(m)
-}
-func (m *Metrics_PTransform_Measured) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_PTransform_Measured.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_PTransform_Measured proto.InternalMessageInfo
-
-func (m *Metrics_PTransform_Measured) GetInputElementCounts() map[string]int64 {
-	if m != nil {
-		return m.InputElementCounts
-	}
-	return nil
-}
-
-func (m *Metrics_PTransform_Measured) GetOutputElementCounts() map[string]int64 {
-	if m != nil {
-		return m.OutputElementCounts
-	}
-	return nil
-}
-
-func (m *Metrics_PTransform_Measured) GetTotalTimeSpent() float64 {
-	if m != nil {
-		return m.TotalTimeSpent
-	}
-	return 0
-}
-
-// Metrics for fully processed elements.
-type Metrics_PTransform_ProcessedElements struct {
-	// (Required)
-	Measured             *Metrics_PTransform_Measured `protobuf:"bytes,1,opt,name=measured,proto3" json:"measured,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}                     `json:"-"`
-	XXX_unrecognized     []byte                       `json:"-"`
-	XXX_sizecache        int32                        `json:"-"`
-}
-
-func (m *Metrics_PTransform_ProcessedElements) Reset()         { *m = Metrics_PTransform_ProcessedElements{} }
-func (m *Metrics_PTransform_ProcessedElements) String() string { return proto.CompactTextString(m) }
-func (*Metrics_PTransform_ProcessedElements) ProtoMessage()    {}
-func (*Metrics_PTransform_ProcessedElements) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 0, 1}
-}
-
-func (m *Metrics_PTransform_ProcessedElements) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_PTransform_ProcessedElements.Unmarshal(m, b)
-}
-func (m *Metrics_PTransform_ProcessedElements) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_PTransform_ProcessedElements.Marshal(b, m, deterministic)
-}
-func (m *Metrics_PTransform_ProcessedElements) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_PTransform_ProcessedElements.Merge(m, src)
-}
-func (m *Metrics_PTransform_ProcessedElements) XXX_Size() int {
-	return xxx_messageInfo_Metrics_PTransform_ProcessedElements.Size(m)
-}
-func (m *Metrics_PTransform_ProcessedElements) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_PTransform_ProcessedElements.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_PTransform_ProcessedElements proto.InternalMessageInfo
-
-func (m *Metrics_PTransform_ProcessedElements) GetMeasured() *Metrics_PTransform_Measured {
-	if m != nil {
-		return m.Measured
-	}
-	return nil
-}
-
-// Metrics for active elements.
-// An element is considered active if the SDK has started but not finished
-// processing it yet.
-type Metrics_PTransform_ActiveElements struct {
-	// (Required)
-	Measured *Metrics_PTransform_Measured `protobuf:"bytes,1,opt,name=measured,proto3" json:"measured,omitempty"`
-	// (Optional) Sum of estimated fraction of known work remaining for all
-	// active elements, as reported by this transform.
-	// If not reported, a Runner could extrapolate this from the processed
-	// elements.
-	// TODO: Handle the case when known work is infinite.
-	FractionRemaining float64 `protobuf:"fixed64,2,opt,name=fraction_remaining,json=fractionRemaining,proto3" json:"fraction_remaining,omitempty"`
-	// (Optional) Map from local output name to sum of estimated number
-	// of elements remaining for this output from all active elements,
-	// as reported by this transform.
-	// If not reported, a Runner could extrapolate this from the processed
-	// elements.
-	OutputElementsRemaining map[string]int64 `protobuf:"bytes,3,rep,name=output_elements_remaining,json=outputElementsRemaining,proto3" json:"output_elements_remaining,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
-	XXX_NoUnkeyedLiteral    struct{}         `json:"-"`
-	XXX_unrecognized        []byte           `json:"-"`
-	XXX_sizecache           int32            `json:"-"`
-}
-
-func (m *Metrics_PTransform_ActiveElements) Reset()         { *m = Metrics_PTransform_ActiveElements{} }
-func (m *Metrics_PTransform_ActiveElements) String() string { return proto.CompactTextString(m) }
-func (*Metrics_PTransform_ActiveElements) ProtoMessage()    {}
-func (*Metrics_PTransform_ActiveElements) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 0, 2}
-}
-
-func (m *Metrics_PTransform_ActiveElements) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_PTransform_ActiveElements.Unmarshal(m, b)
-}
-func (m *Metrics_PTransform_ActiveElements) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_PTransform_ActiveElements.Marshal(b, m, deterministic)
-}
-func (m *Metrics_PTransform_ActiveElements) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_PTransform_ActiveElements.Merge(m, src)
-}
-func (m *Metrics_PTransform_ActiveElements) XXX_Size() int {
-	return xxx_messageInfo_Metrics_PTransform_ActiveElements.Size(m)
-}
-func (m *Metrics_PTransform_ActiveElements) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_PTransform_ActiveElements.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_PTransform_ActiveElements proto.InternalMessageInfo
-
-func (m *Metrics_PTransform_ActiveElements) GetMeasured() *Metrics_PTransform_Measured {
-	if m != nil {
-		return m.Measured
-	}
-	return nil
-}
-
-func (m *Metrics_PTransform_ActiveElements) GetFractionRemaining() float64 {
-	if m != nil {
-		return m.FractionRemaining
-	}
-	return 0
-}
-
-func (m *Metrics_PTransform_ActiveElements) GetOutputElementsRemaining() map[string]int64 {
-	if m != nil {
-		return m.OutputElementsRemaining
-	}
-	return nil
-}
-
-// User defined metrics
-type Metrics_User struct {
-	// (Required) The identifier for this metric.
-	MetricName *Metrics_User_MetricName `protobuf:"bytes,1,opt,name=metric_name,json=metricName,proto3" json:"metric_name,omitempty"`
-	// (Required) The data for this metric.
-	//
-	// Types that are valid to be assigned to Data:
-	//	*Metrics_User_CounterData_
-	//	*Metrics_User_DistributionData_
-	//	*Metrics_User_GaugeData_
-	Data                 isMetrics_User_Data `protobuf_oneof:"data"`
-	XXX_NoUnkeyedLiteral struct{}            `json:"-"`
-	XXX_unrecognized     []byte              `json:"-"`
-	XXX_sizecache        int32               `json:"-"`
-}
-
-func (m *Metrics_User) Reset()         { *m = Metrics_User{} }
-func (m *Metrics_User) String() string { return proto.CompactTextString(m) }
-func (*Metrics_User) ProtoMessage()    {}
-func (*Metrics_User) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 1}
-}
-
-func (m *Metrics_User) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_User.Unmarshal(m, b)
-}
-func (m *Metrics_User) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_User.Marshal(b, m, deterministic)
-}
-func (m *Metrics_User) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_User.Merge(m, src)
-}
-func (m *Metrics_User) XXX_Size() int {
-	return xxx_messageInfo_Metrics_User.Size(m)
-}
-func (m *Metrics_User) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_User.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_User proto.InternalMessageInfo
-
-func (m *Metrics_User) GetMetricName() *Metrics_User_MetricName {
-	if m != nil {
-		return m.MetricName
-	}
-	return nil
-}
-
-type isMetrics_User_Data interface {
-	isMetrics_User_Data()
-}
-
-type Metrics_User_CounterData_ struct {
-	CounterData *Metrics_User_CounterData `protobuf:"bytes,1001,opt,name=counter_data,json=counterData,proto3,oneof"`
-}
-
-type Metrics_User_DistributionData_ struct {
-	DistributionData *Metrics_User_DistributionData `protobuf:"bytes,1002,opt,name=distribution_data,json=distributionData,proto3,oneof"`
-}
-
-type Metrics_User_GaugeData_ struct {
-	GaugeData *Metrics_User_GaugeData `protobuf:"bytes,1003,opt,name=gauge_data,json=gaugeData,proto3,oneof"`
-}
-
-func (*Metrics_User_CounterData_) isMetrics_User_Data() {}
-
-func (*Metrics_User_DistributionData_) isMetrics_User_Data() {}
-
-func (*Metrics_User_GaugeData_) isMetrics_User_Data() {}
-
-func (m *Metrics_User) GetData() isMetrics_User_Data {
-	if m != nil {
-		return m.Data
-	}
-	return nil
-}
-
-func (m *Metrics_User) GetCounterData() *Metrics_User_CounterData {
-	if x, ok := m.GetData().(*Metrics_User_CounterData_); ok {
-		return x.CounterData
-	}
-	return nil
-}
-
-func (m *Metrics_User) GetDistributionData() *Metrics_User_DistributionData {
-	if x, ok := m.GetData().(*Metrics_User_DistributionData_); ok {
-		return x.DistributionData
-	}
-	return nil
-}
-
-func (m *Metrics_User) GetGaugeData() *Metrics_User_GaugeData {
-	if x, ok := m.GetData().(*Metrics_User_GaugeData_); ok {
-		return x.GaugeData
-	}
-	return nil
-}
-
-// XXX_OneofWrappers is for the internal use of the proto package.
-func (*Metrics_User) XXX_OneofWrappers() []interface{} {
-	return []interface{}{
-		(*Metrics_User_CounterData_)(nil),
-		(*Metrics_User_DistributionData_)(nil),
-		(*Metrics_User_GaugeData_)(nil),
-	}
-}
-
-// A key for identifying a metric at the most granular level.
-type Metrics_User_MetricName struct {
-	// (Required): The namespace of this metric.
-	Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
-	// (Required): The name of this metric.
-	Name                 string   `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
-	XXX_NoUnkeyedLiteral struct{} `json:"-"`
-	XXX_unrecognized     []byte   `json:"-"`
-	XXX_sizecache        int32    `json:"-"`
-}
-
-func (m *Metrics_User_MetricName) Reset()         { *m = Metrics_User_MetricName{} }
-func (m *Metrics_User_MetricName) String() string { return proto.CompactTextString(m) }
-func (*Metrics_User_MetricName) ProtoMessage()    {}
-func (*Metrics_User_MetricName) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 1, 0}
-}
-
-func (m *Metrics_User_MetricName) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_User_MetricName.Unmarshal(m, b)
-}
-func (m *Metrics_User_MetricName) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_User_MetricName.Marshal(b, m, deterministic)
-}
-func (m *Metrics_User_MetricName) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_User_MetricName.Merge(m, src)
-}
-func (m *Metrics_User_MetricName) XXX_Size() int {
-	return xxx_messageInfo_Metrics_User_MetricName.Size(m)
-}
-func (m *Metrics_User_MetricName) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_User_MetricName.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_User_MetricName proto.InternalMessageInfo
-
-func (m *Metrics_User_MetricName) GetNamespace() string {
-	if m != nil {
-		return m.Namespace
-	}
-	return ""
-}
-
-func (m *Metrics_User_MetricName) GetName() string {
-	if m != nil {
-		return m.Name
-	}
-	return ""
-}
-
-// Data associated with a counter metric.
-type Metrics_User_CounterData struct {
-	Value                int64    `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
-	XXX_NoUnkeyedLiteral struct{} `json:"-"`
-	XXX_unrecognized     []byte   `json:"-"`
-	XXX_sizecache        int32    `json:"-"`
-}
-
-func (m *Metrics_User_CounterData) Reset()         { *m = Metrics_User_CounterData{} }
-func (m *Metrics_User_CounterData) String() string { return proto.CompactTextString(m) }
-func (*Metrics_User_CounterData) ProtoMessage()    {}
-func (*Metrics_User_CounterData) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 1, 1}
-}
-
-func (m *Metrics_User_CounterData) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_User_CounterData.Unmarshal(m, b)
-}
-func (m *Metrics_User_CounterData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_User_CounterData.Marshal(b, m, deterministic)
-}
-func (m *Metrics_User_CounterData) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_User_CounterData.Merge(m, src)
-}
-func (m *Metrics_User_CounterData) XXX_Size() int {
-	return xxx_messageInfo_Metrics_User_CounterData.Size(m)
-}
-func (m *Metrics_User_CounterData) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_User_CounterData.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_User_CounterData proto.InternalMessageInfo
-
-func (m *Metrics_User_CounterData) GetValue() int64 {
-	if m != nil {
-		return m.Value
-	}
-	return 0
-}
-
-// Data associated with a distribution metric.
-type Metrics_User_DistributionData struct {
-	Count                int64    `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
-	Sum                  int64    `protobuf:"varint,2,opt,name=sum,proto3" json:"sum,omitempty"`
-	Min                  int64    `protobuf:"varint,3,opt,name=min,proto3" json:"min,omitempty"`
-	Max                  int64    `protobuf:"varint,4,opt,name=max,proto3" json:"max,omitempty"`
-	XXX_NoUnkeyedLiteral struct{} `json:"-"`
-	XXX_unrecognized     []byte   `json:"-"`
-	XXX_sizecache        int32    `json:"-"`
-}
-
-func (m *Metrics_User_DistributionData) Reset()         { *m = Metrics_User_DistributionData{} }
-func (m *Metrics_User_DistributionData) String() string { return proto.CompactTextString(m) }
-func (*Metrics_User_DistributionData) ProtoMessage()    {}
-func (*Metrics_User_DistributionData) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 1, 2}
-}
-
-func (m *Metrics_User_DistributionData) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_User_DistributionData.Unmarshal(m, b)
-}
-func (m *Metrics_User_DistributionData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_User_DistributionData.Marshal(b, m, deterministic)
-}
-func (m *Metrics_User_DistributionData) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_User_DistributionData.Merge(m, src)
-}
-func (m *Metrics_User_DistributionData) XXX_Size() int {
-	return xxx_messageInfo_Metrics_User_DistributionData.Size(m)
-}
-func (m *Metrics_User_DistributionData) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_User_DistributionData.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_User_DistributionData proto.InternalMessageInfo
-
-func (m *Metrics_User_DistributionData) GetCount() int64 {
-	if m != nil {
-		return m.Count
-	}
-	return 0
-}
-
-func (m *Metrics_User_DistributionData) GetSum() int64 {
-	if m != nil {
-		return m.Sum
-	}
-	return 0
-}
-
-func (m *Metrics_User_DistributionData) GetMin() int64 {
-	if m != nil {
-		return m.Min
-	}
-	return 0
-}
-
-func (m *Metrics_User_DistributionData) GetMax() int64 {
-	if m != nil {
-		return m.Max
-	}
-	return 0
-}
-
-// Data associated with a Gauge metric.
-type Metrics_User_GaugeData struct {
-	Value                int64                `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
-	Timestamp            *timestamp.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}             `json:"-"`
-	XXX_unrecognized     []byte               `json:"-"`
-	XXX_sizecache        int32                `json:"-"`
-}
-
-func (m *Metrics_User_GaugeData) Reset()         { *m = Metrics_User_GaugeData{} }
-func (m *Metrics_User_GaugeData) String() string { return proto.CompactTextString(m) }
-func (*Metrics_User_GaugeData) ProtoMessage()    {}
-func (*Metrics_User_GaugeData) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{12, 1, 3}
-}
-
-func (m *Metrics_User_GaugeData) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_Metrics_User_GaugeData.Unmarshal(m, b)
-}
-func (m *Metrics_User_GaugeData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_Metrics_User_GaugeData.Marshal(b, m, deterministic)
-}
-func (m *Metrics_User_GaugeData) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_Metrics_User_GaugeData.Merge(m, src)
-}
-func (m *Metrics_User_GaugeData) XXX_Size() int {
-	return xxx_messageInfo_Metrics_User_GaugeData.Size(m)
-}
-func (m *Metrics_User_GaugeData) XXX_DiscardUnknown() {
-	xxx_messageInfo_Metrics_User_GaugeData.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Metrics_User_GaugeData proto.InternalMessageInfo
-
-func (m *Metrics_User_GaugeData) GetValue() int64 {
-	if m != nil {
-		return m.Value
-	}
-	return 0
-}
-
-func (m *Metrics_User_GaugeData) GetTimestamp() *timestamp.Timestamp {
-	if m != nil {
-		return m.Timestamp
-	}
-	return nil
-}
-
 type ProcessBundleProgressResponse struct {
-	// DEPRECATED (Required)
-	Metrics *Metrics `protobuf:"bytes,1,opt,name=metrics,proto3" json:"metrics,omitempty"`
 	// DEPRECATED (Required) The list of metrics or other MonitoredState
 	// collected while processing this bundle.
 	MonitoringInfos []*pipeline_v1.MonitoringInfo `protobuf:"bytes,3,rep,name=monitoring_infos,json=monitoringInfos,proto3" json:"monitoring_infos,omitempty"`
-	// The list of currently active primary roots that are being
-	// executed. Required to be populated for PTransforms which can be split.
-	PrimaryRoots []*BundleApplication `protobuf:"bytes,4,rep,name=primary_roots,json=primaryRoots,proto3" json:"primary_roots,omitempty"`
 	// An identifier to MonitoringInfo.payload mapping.
 	//
 	// An SDK can report metrics using an identifier that only contains the
@@ -1831,7 +1217,7 @@ func (m *ProcessBundleProgressResponse) Reset()         { *m = ProcessBundleProg
 func (m *ProcessBundleProgressResponse) String() string { return proto.CompactTextString(m) }
 func (*ProcessBundleProgressResponse) ProtoMessage()    {}
 func (*ProcessBundleProgressResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{13}
+	return fileDescriptor_6d954c03a4758710, []int{12}
 }
 
 func (m *ProcessBundleProgressResponse) XXX_Unmarshal(b []byte) error {
@@ -1852,13 +1238,6 @@ func (m *ProcessBundleProgressResponse) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_ProcessBundleProgressResponse proto.InternalMessageInfo
 
-func (m *ProcessBundleProgressResponse) GetMetrics() *Metrics {
-	if m != nil {
-		return m.Metrics
-	}
-	return nil
-}
-
 func (m *ProcessBundleProgressResponse) GetMonitoringInfos() []*pipeline_v1.MonitoringInfo {
 	if m != nil {
 		return m.MonitoringInfos
@@ -1866,13 +1245,6 @@ func (m *ProcessBundleProgressResponse) GetMonitoringInfos() []*pipeline_v1.Moni
 	return nil
 }
 
-func (m *ProcessBundleProgressResponse) GetPrimaryRoots() []*BundleApplication {
-	if m != nil {
-		return m.PrimaryRoots
-	}
-	return nil
-}
-
 func (m *ProcessBundleProgressResponse) GetMonitoringData() map[string][]byte {
 	if m != nil {
 		return m.MonitoringData
@@ -1903,7 +1275,7 @@ func (m *ProcessBundleProgressMetadataResponse) Reset()         { *m = ProcessBu
 func (m *ProcessBundleProgressMetadataResponse) String() string { return proto.CompactTextString(m) }
 func (*ProcessBundleProgressMetadataResponse) ProtoMessage()    {}
 func (*ProcessBundleProgressMetadataResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{14}
+	return fileDescriptor_6d954c03a4758710, []int{13}
 }
 
 func (m *ProcessBundleProgressMetadataResponse) XXX_Unmarshal(b []byte) error {
@@ -1951,7 +1323,7 @@ func (m *ProcessBundleSplitRequest) Reset()         { *m = ProcessBundleSplitReq
 func (m *ProcessBundleSplitRequest) String() string { return proto.CompactTextString(m) }
 func (*ProcessBundleSplitRequest) ProtoMessage()    {}
 func (*ProcessBundleSplitRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{15}
+	return fileDescriptor_6d954c03a4758710, []int{14}
 }
 
 func (m *ProcessBundleSplitRequest) XXX_Unmarshal(b []byte) error {
@@ -2012,7 +1384,7 @@ func (m *ProcessBundleSplitRequest_DesiredSplit) Reset() {
 func (m *ProcessBundleSplitRequest_DesiredSplit) String() string { return proto.CompactTextString(m) }
 func (*ProcessBundleSplitRequest_DesiredSplit) ProtoMessage()    {}
 func (*ProcessBundleSplitRequest_DesiredSplit) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{15, 0}
+	return fileDescriptor_6d954c03a4758710, []int{14, 0}
 }
 
 func (m *ProcessBundleSplitRequest_DesiredSplit) XXX_Unmarshal(b []byte) error {
@@ -2083,7 +1455,7 @@ func (m *ProcessBundleSplitResponse) Reset()         { *m = ProcessBundleSplitRe
 func (m *ProcessBundleSplitResponse) String() string { return proto.CompactTextString(m) }
 func (*ProcessBundleSplitResponse) ProtoMessage()    {}
 func (*ProcessBundleSplitResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{16}
+	return fileDescriptor_6d954c03a4758710, []int{15}
 }
 
 func (m *ProcessBundleSplitResponse) XXX_Unmarshal(b []byte) error {
@@ -2155,7 +1527,7 @@ func (m *ProcessBundleSplitResponse_ChannelSplit) Reset() {
 func (m *ProcessBundleSplitResponse_ChannelSplit) String() string { return proto.CompactTextString(m) }
 func (*ProcessBundleSplitResponse_ChannelSplit) ProtoMessage()    {}
 func (*ProcessBundleSplitResponse_ChannelSplit) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{16, 0}
+	return fileDescriptor_6d954c03a4758710, []int{15, 0}
 }
 
 func (m *ProcessBundleSplitResponse_ChannelSplit) XXX_Unmarshal(b []byte) error {
@@ -2210,7 +1582,7 @@ func (m *FinalizeBundleRequest) Reset()         { *m = FinalizeBundleRequest{} }
 func (m *FinalizeBundleRequest) String() string { return proto.CompactTextString(m) }
 func (*FinalizeBundleRequest) ProtoMessage()    {}
 func (*FinalizeBundleRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{17}
+	return fileDescriptor_6d954c03a4758710, []int{16}
 }
 
 func (m *FinalizeBundleRequest) XXX_Unmarshal(b []byte) error {
@@ -2248,7 +1620,7 @@ func (m *FinalizeBundleResponse) Reset()         { *m = FinalizeBundleResponse{}
 func (m *FinalizeBundleResponse) String() string { return proto.CompactTextString(m) }
 func (*FinalizeBundleResponse) ProtoMessage()    {}
 func (*FinalizeBundleResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{18}
+	return fileDescriptor_6d954c03a4758710, []int{17}
 }
 
 func (m *FinalizeBundleResponse) XXX_Unmarshal(b []byte) error {
@@ -2285,7 +1657,7 @@ func (m *Elements) Reset()         { *m = Elements{} }
 func (m *Elements) String() string { return proto.CompactTextString(m) }
 func (*Elements) ProtoMessage()    {}
 func (*Elements) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{19}
+	return fileDescriptor_6d954c03a4758710, []int{18}
 }
 
 func (m *Elements) XXX_Unmarshal(b []byte) error {
@@ -2351,7 +1723,7 @@ func (m *Elements_Data) Reset()         { *m = Elements_Data{} }
 func (m *Elements_Data) String() string { return proto.CompactTextString(m) }
 func (*Elements_Data) ProtoMessage()    {}
 func (*Elements_Data) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{19, 0}
+	return fileDescriptor_6d954c03a4758710, []int{18, 0}
 }
 
 func (m *Elements_Data) XXX_Unmarshal(b []byte) error {
@@ -2429,7 +1801,7 @@ func (m *Elements_Timer) Reset()         { *m = Elements_Timer{} }
 func (m *Elements_Timer) String() string { return proto.CompactTextString(m) }
 func (*Elements_Timer) ProtoMessage()    {}
 func (*Elements_Timer) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{19, 1}
+	return fileDescriptor_6d954c03a4758710, []int{18, 1}
 }
 
 func (m *Elements_Timer) XXX_Unmarshal(b []byte) error {
@@ -2512,7 +1884,7 @@ func (m *StateRequest) Reset()         { *m = StateRequest{} }
 func (m *StateRequest) String() string { return proto.CompactTextString(m) }
 func (*StateRequest) ProtoMessage()    {}
 func (*StateRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{20}
+	return fileDescriptor_6d954c03a4758710, []int{19}
 }
 
 func (m *StateRequest) XXX_Unmarshal(b []byte) error {
@@ -2638,7 +2010,7 @@ func (m *StateResponse) Reset()         { *m = StateResponse{} }
 func (m *StateResponse) String() string { return proto.CompactTextString(m) }
 func (*StateResponse) ProtoMessage()    {}
 func (*StateResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{21}
+	return fileDescriptor_6d954c03a4758710, []int{20}
 }
 
 func (m *StateResponse) XXX_Unmarshal(b []byte) error {
@@ -2751,7 +2123,7 @@ func (m *StateKey) Reset()         { *m = StateKey{} }
 func (m *StateKey) String() string { return proto.CompactTextString(m) }
 func (*StateKey) ProtoMessage()    {}
 func (*StateKey) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{22}
+	return fileDescriptor_6d954c03a4758710, []int{21}
 }
 
 func (m *StateKey) XXX_Unmarshal(b []byte) error {
@@ -2877,7 +2249,7 @@ func (m *StateKey_Runner) Reset()         { *m = StateKey_Runner{} }
 func (m *StateKey_Runner) String() string { return proto.CompactTextString(m) }
 func (*StateKey_Runner) ProtoMessage()    {}
 func (*StateKey_Runner) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{22, 0}
+	return fileDescriptor_6d954c03a4758710, []int{21, 0}
 }
 
 func (m *StateKey_Runner) XXX_Unmarshal(b []byte) error {
@@ -2933,7 +2305,7 @@ func (m *StateKey_IterableSideInput) Reset()         { *m = StateKey_IterableSid
 func (m *StateKey_IterableSideInput) String() string { return proto.CompactTextString(m) }
 func (*StateKey_IterableSideInput) ProtoMessage()    {}
 func (*StateKey_IterableSideInput) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{22, 1}
+	return fileDescriptor_6d954c03a4758710, []int{21, 1}
 }
 
 func (m *StateKey_IterableSideInput) XXX_Unmarshal(b []byte) error {
@@ -3006,7 +2378,7 @@ func (m *StateKey_MultimapSideInput) Reset()         { *m = StateKey_MultimapSid
 func (m *StateKey_MultimapSideInput) String() string { return proto.CompactTextString(m) }
 func (*StateKey_MultimapSideInput) ProtoMessage()    {}
 func (*StateKey_MultimapSideInput) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{22, 2}
+	return fileDescriptor_6d954c03a4758710, []int{21, 2}
 }
 
 func (m *StateKey_MultimapSideInput) XXX_Unmarshal(b []byte) error {
@@ -3083,7 +2455,7 @@ func (m *StateKey_MultimapKeysSideInput) Reset()         { *m = StateKey_Multima
 func (m *StateKey_MultimapKeysSideInput) String() string { return proto.CompactTextString(m) }
 func (*StateKey_MultimapKeysSideInput) ProtoMessage()    {}
 func (*StateKey_MultimapKeysSideInput) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{22, 3}
+	return fileDescriptor_6d954c03a4758710, []int{21, 3}
 }
 
 func (m *StateKey_MultimapKeysSideInput) XXX_Unmarshal(b []byte) error {
@@ -3144,7 +2516,7 @@ func (m *StateKey_BagUserState) Reset()         { *m = StateKey_BagUserState{} }
 func (m *StateKey_BagUserState) String() string { return proto.CompactTextString(m) }
 func (*StateKey_BagUserState) ProtoMessage()    {}
 func (*StateKey_BagUserState) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{22, 4}
+	return fileDescriptor_6d954c03a4758710, []int{21, 4}
 }
 
 func (m *StateKey_BagUserState) XXX_Unmarshal(b []byte) error {
@@ -3210,7 +2582,7 @@ func (m *StateGetRequest) Reset()         { *m = StateGetRequest{} }
 func (m *StateGetRequest) String() string { return proto.CompactTextString(m) }
 func (*StateGetRequest) ProtoMessage()    {}
 func (*StateGetRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{23}
+	return fileDescriptor_6d954c03a4758710, []int{22}
 }
 
 func (m *StateGetRequest) XXX_Unmarshal(b []byte) error {
@@ -3258,7 +2630,7 @@ func (m *StateGetResponse) Reset()         { *m = StateGetResponse{} }
 func (m *StateGetResponse) String() string { return proto.CompactTextString(m) }
 func (*StateGetResponse) ProtoMessage()    {}
 func (*StateGetResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{24}
+	return fileDescriptor_6d954c03a4758710, []int{23}
 }
 
 func (m *StateGetResponse) XXX_Unmarshal(b []byte) error {
@@ -3308,7 +2680,7 @@ func (m *StateAppendRequest) Reset()         { *m = StateAppendRequest{} }
 func (m *StateAppendRequest) String() string { return proto.CompactTextString(m) }
 func (*StateAppendRequest) ProtoMessage()    {}
 func (*StateAppendRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{25}
+	return fileDescriptor_6d954c03a4758710, []int{24}
 }
 
 func (m *StateAppendRequest) XXX_Unmarshal(b []byte) error {
@@ -3347,7 +2719,7 @@ func (m *StateAppendResponse) Reset()         { *m = StateAppendResponse{} }
 func (m *StateAppendResponse) String() string { return proto.CompactTextString(m) }
 func (*StateAppendResponse) ProtoMessage()    {}
 func (*StateAppendResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{26}
+	return fileDescriptor_6d954c03a4758710, []int{25}
 }
 
 func (m *StateAppendResponse) XXX_Unmarshal(b []byte) error {
@@ -3379,7 +2751,7 @@ func (m *StateClearRequest) Reset()         { *m = StateClearRequest{} }
 func (m *StateClearRequest) String() string { return proto.CompactTextString(m) }
 func (*StateClearRequest) ProtoMessage()    {}
 func (*StateClearRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{27}
+	return fileDescriptor_6d954c03a4758710, []int{26}
 }
 
 func (m *StateClearRequest) XXX_Unmarshal(b []byte) error {
@@ -3411,7 +2783,7 @@ func (m *StateClearResponse) Reset()         { *m = StateClearResponse{} }
 func (m *StateClearResponse) String() string { return proto.CompactTextString(m) }
 func (*StateClearResponse) ProtoMessage()    {}
 func (*StateClearResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{28}
+	return fileDescriptor_6d954c03a4758710, []int{27}
 }
 
 func (m *StateClearResponse) XXX_Unmarshal(b []byte) error {
@@ -3468,7 +2840,7 @@ func (m *LogEntry) Reset()         { *m = LogEntry{} }
 func (m *LogEntry) String() string { return proto.CompactTextString(m) }
 func (*LogEntry) ProtoMessage()    {}
 func (*LogEntry) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{29}
+	return fileDescriptor_6d954c03a4758710, []int{28}
 }
 
 func (m *LogEntry) XXX_Unmarshal(b []byte) error {
@@ -3559,7 +2931,7 @@ func (m *LogEntry_List) Reset()         { *m = LogEntry_List{} }
 func (m *LogEntry_List) String() string { return proto.CompactTextString(m) }
 func (*LogEntry_List) ProtoMessage()    {}
 func (*LogEntry_List) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{29, 0}
+	return fileDescriptor_6d954c03a4758710, []int{28, 0}
 }
 
 func (m *LogEntry_List) XXX_Unmarshal(b []byte) error {
@@ -3610,7 +2982,7 @@ func (m *LogEntry_Severity) Reset()         { *m = LogEntry_Severity{} }
 func (m *LogEntry_Severity) String() string { return proto.CompactTextString(m) }
 func (*LogEntry_Severity) ProtoMessage()    {}
 func (*LogEntry_Severity) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{29, 1}
+	return fileDescriptor_6d954c03a4758710, []int{28, 1}
 }
 
 func (m *LogEntry_Severity) XXX_Unmarshal(b []byte) error {
@@ -3641,7 +3013,7 @@ func (m *LogControl) Reset()         { *m = LogControl{} }
 func (m *LogControl) String() string { return proto.CompactTextString(m) }
 func (*LogControl) ProtoMessage()    {}
 func (*LogControl) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{30}
+	return fileDescriptor_6d954c03a4758710, []int{29}
 }
 
 func (m *LogControl) XXX_Unmarshal(b []byte) error {
@@ -3678,7 +3050,7 @@ func (m *StartWorkerRequest) Reset()         { *m = StartWorkerRequest{} }
 func (m *StartWorkerRequest) String() string { return proto.CompactTextString(m) }
 func (*StartWorkerRequest) ProtoMessage()    {}
 func (*StartWorkerRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{31}
+	return fileDescriptor_6d954c03a4758710, []int{30}
 }
 
 func (m *StartWorkerRequest) XXX_Unmarshal(b []byte) error {
@@ -3752,7 +3124,7 @@ func (m *StartWorkerResponse) Reset()         { *m = StartWorkerResponse{} }
 func (m *StartWorkerResponse) String() string { return proto.CompactTextString(m) }
 func (*StartWorkerResponse) ProtoMessage()    {}
 func (*StartWorkerResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{32}
+	return fileDescriptor_6d954c03a4758710, []int{31}
 }
 
 func (m *StartWorkerResponse) XXX_Unmarshal(b []byte) error {
@@ -3791,7 +3163,7 @@ func (m *StopWorkerRequest) Reset()         { *m = StopWorkerRequest{} }
 func (m *StopWorkerRequest) String() string { return proto.CompactTextString(m) }
 func (*StopWorkerRequest) ProtoMessage()    {}
 func (*StopWorkerRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{33}
+	return fileDescriptor_6d954c03a4758710, []int{32}
 }
 
 func (m *StopWorkerRequest) XXX_Unmarshal(b []byte) error {
@@ -3830,7 +3202,7 @@ func (m *StopWorkerResponse) Reset()         { *m = StopWorkerResponse{} }
 func (m *StopWorkerResponse) String() string { return proto.CompactTextString(m) }
 func (*StopWorkerResponse) ProtoMessage()    {}
 func (*StopWorkerResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{34}
+	return fileDescriptor_6d954c03a4758710, []int{33}
 }
 
 func (m *StopWorkerResponse) XXX_Unmarshal(b []byte) error {
@@ -3872,7 +3244,7 @@ func (m *WorkerStatusRequest) Reset()         { *m = WorkerStatusRequest{} }
 func (m *WorkerStatusRequest) String() string { return proto.CompactTextString(m) }
 func (*WorkerStatusRequest) ProtoMessage()    {}
 func (*WorkerStatusRequest) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{35}
+	return fileDescriptor_6d954c03a4758710, []int{34}
 }
 
 func (m *WorkerStatusRequest) XXX_Unmarshal(b []byte) error {
@@ -3921,7 +3293,7 @@ func (m *WorkerStatusResponse) Reset()         { *m = WorkerStatusResponse{} }
 func (m *WorkerStatusResponse) String() string { return proto.CompactTextString(m) }
 func (*WorkerStatusResponse) ProtoMessage()    {}
 func (*WorkerStatusResponse) Descriptor() ([]byte, []int) {
-	return fileDescriptor_6d954c03a4758710, []int{36}
+	return fileDescriptor_6d954c03a4758710, []int{35}
 }
 
 func (m *WorkerStatusResponse) XXX_Unmarshal(b []byte) error {
@@ -3987,21 +3359,6 @@ func init() {
 	proto.RegisterMapType((map[string][]byte)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleResponse.MonitoringDataEntry")
 	proto.RegisterType((*ProcessBundleProgressRequest)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressRequest")
 	proto.RegisterType((*ProcessBundleProgressMetadataRequest)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressMetadataRequest")
-	proto.RegisterType((*Metrics)(nil), "org.apache.beam.model.fn_execution.v1.Metrics")
-	proto.RegisterMapType((map[string]*Metrics_PTransform)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PtransformsEntry")
-	proto.RegisterType((*Metrics_PTransform)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform")
-	proto.RegisterMapType((map[string]int64)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.WatermarksEntry")
-	proto.RegisterType((*Metrics_PTransform_Measured)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.Measured")
-	proto.RegisterMapType((map[string]int64)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.Measured.InputElementCountsEntry")
-	proto.RegisterMapType((map[string]int64)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.Measured.OutputElementCountsEntry")
-	proto.RegisterType((*Metrics_PTransform_ProcessedElements)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.ProcessedElements")
-	proto.RegisterType((*Metrics_PTransform_ActiveElements)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.ActiveElements")
-	proto.RegisterMapType((map[string]int64)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.ActiveElements.OutputElementsRemainingEntry")
-	proto.RegisterType((*Metrics_User)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.User")
-	proto.RegisterType((*Metrics_User_MetricName)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.User.MetricName")
-	proto.RegisterType((*Metrics_User_CounterData)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.User.CounterData")
-	proto.RegisterType((*Metrics_User_DistributionData)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.User.DistributionData")
-	proto.RegisterType((*Metrics_User_GaugeData)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.User.GaugeData")
 	proto.RegisterType((*ProcessBundleProgressResponse)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressResponse")
 	proto.RegisterMapType((map[string][]byte)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressResponse.MonitoringDataEntry")
 	proto.RegisterType((*ProcessBundleProgressMetadataResponse)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressMetadataResponse")
@@ -4046,228 +3403,194 @@ func init() {
 func init() { proto.RegisterFile("beam_fn_api.proto", fileDescriptor_6d954c03a4758710) }
 
 var fileDescriptor_6d954c03a4758710 = []byte{
-	// 3529 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x5b, 0xcd, 0x73, 0x1b, 0xc7,
-	0x95, 0xd7, 0x10, 0x00, 0x09, 0x3c, 0x80, 0x24, 0xd8, 0x24, 0x25, 0x68, 0x56, 0x5e, 0xcb, 0x58,
-	0x6b, 0x8b, 0xb5, 0xbb, 0x86, 0x24, 0x4a, 0x6b, 0x4b, 0x5e, 0x5b, 0x36, 0x3f, 0x25, 0x58, 0x94,
-	0x84, 0x1d, 0x52, 0x2b, 0xaf, 0x1d, 0x7b, 0x32, 0xc4, 0x34, 0xa0, 0x29, 0x01, 0x33, 0xe3, 0xe9,
-	0x01, 0x25, 0x2a, 0x4e, 0x1c, 0x57, 0x52, 0x49, 0x39, 0x15, 0x97, 0x2f, 0xa9, 0x8a, 0xe3, 0x94,
-	0x0f, 0x49, 0xe5, 0xe2, 0x1c, 0x72, 0xcc, 0x21, 0xd7, 0x1c, 0x72, 0x4a, 0xe5, 0x1f, 0xc8, 0x25,
-	0x55, 0x39, 0x24, 0x71, 0x7c, 0xc8, 0x25, 0xd7, 0xa4, 0xfa, 0x63, 0x66, 0x1a, 0x83, 0x81, 0x34,
-	0x00, 0x28, 0xdf, 0xd0, 0x5f, 0xbf, 0xdf, 0xeb, 0xd7, 0xef, 0xbd, 0x79, 0xfd, 0x01, 0x58, 0xd8,
-	0xc7, 0x46, 0x57, 0x6f, 0xd9, 0xba, 0xe1, 0x5a, 0x35, 0xd7, 0x73, 0x7c, 0x07, 0x9d, 0x71, 0xbc,
-	0x76, 0xcd, 0x70, 0x8d, 0xe6, 0x5d, 0x5c, 0xa3, 0xad, 0xb5, 0xae, 0x63, 0xe2, 0x4e, 0xad, 0x65,
-	0xeb, 0xf8, 0x01, 0x6e, 0xf6, 0x7c, 0xcb, 0xb1, 0x6b, 0x07, 0xe7, 0xd5, 0x65, 0x36, 0xd2, 0xeb,
-	0xd9, 0x36, 0xf6, 0xa2, 0xd1, 0xea, 0x3c, 0xb6, 0x4d, 0xd7, 0xb1, 0x6c, 0x9f, 0x88, 0x8a, 0xd3,
-	0x6d, 0xc7, 0x69, 0x77, 0xf0, 0x59, 0x56, 0xda, 0xef, 0xb5, 0xce, 0x9a, 0x98, 0x34, 0x3d, 0xcb,
-	0xf5, 0x1d, 0x4f, 0xf4, 0x78, 0x3a, 0xde, 0xc3, 0xb7, 0xba, 0x98, 0xf8, 0x46, 0xd7, 0x15, 0x1d,
-	0xfe, 0x75, 0x00, 0xa2, 0xe7, 0x19, 0x4c, 0x8e, 0x21, 0xed, 0xf7, 0x3d, 0xc3, 0x75, 0xb1, 0x17,
-	0x88, 0x30, 0xdb, 0xc5, 0xbe, 0x67, 0x35, 0x45, 0xb1, 0xfa, 0x89, 0x02, 0x73, 0x1a, 0xee, 0x3a,
-	0x3e, 0xbe, 0xea, 0xb9, 0xcd, 0x86, 0xe3, 0xf9, 0xa8, 0x0b, 0xc7, 0x0d, 0xd7, 0xd2, 0x09, 0xf6,
-	0x0e, 0xac, 0x26, 0xd6, 0x23, 0x11, 0x2b, 0xca, 0x69, 0x65, 0xa5, 0xb8, 0xfa, 0x42, 0x2d, 0x59,
-	0x29, 0xae, 0xe5, 0xe2, 0x8e, 0x65, 0xe3, 0xda, 0xc1, 0xf9, 0xda, 0x9a, 0x6b, 0xed, 0xf2, 0xf1,
-	0x9b, 0xe1, 0x70, 0x6d, 0xc9, 0x48, 0xa8, 0x45, 0x27, 0x21, 0xdf, 0x74, 0x4c, 0xec, 0xe9, 0x96,
-	0x59, 0x99, 0x3a, 0xad, 0xac, 0x14, 0xb4, 0x19, 0x56, 0xae, 0x9b, 0xd5, 0x7f, 0xe4, 0x00, 0xd5,
-	0x6d, 0xe2, 0x7b, 0xbd, 0x26, 0x9d, 0xa1, 0x86, 0xdf, 0xe9, 0x61, 0xe2, 0xa3, 0x33, 0x30, 0x67,
-	0x45, 0xb5, 0x74, 0x9c, 0xc2, 0xc6, 0xcd, 0x4a, 0xb5, 0x75, 0x13, 0xdd, 0x86, 0xbc, 0x87, 0xdb,
-	0x16, 0xf1, 0xb1, 0x57, 0xf9, 0xd3, 0x0c, 0x13, 0xfd, 0xf9, 0x5a, 0xaa, 0xf5, 0xac, 0x69, 0x62,
-	0x9c, 0x60, 0xbc, 0x76, 0x4c, 0x0b, 0xa1, 0x10, 0x86, 0x39, 0xd7, 0x73, 0x9a, 0x98, 0x10, 0x7d,
-	0xbf, 0x67, 0x9b, 0x1d, 0x5c, 0xf9, 0x33, 0x07, 0xff, 0x9f, 0x94, 0xe0, 0x0d, 0x3e, 0x7a, 0x9d,
-	0x0d, 0x8e, 0x18, 0x66, 0x5d, 0xb9, 0x1e, 0x7d, 0x03, 0x4e, 0xf4, 0xd3, 0xe8, 0xae, 0xe7, 0xb4,
-	0x3d, 0x4c, 0x48, 0xe5, 0x2f, 0x9c, 0x6f, 0x63, 0x1c, 0xbe, 0x86, 0x00, 0x89, 0x78, 0x97, 0xdd,
-	0xa4, 0x76, 0xd4, 0x83, 0xa5, 0x18, 0x3f, 0x71, 0x3b, 0x96, 0x5f, 0xf9, 0x9c, 0x93, 0xbf, 0x3a,
-	0x0e, 0xf9, 0x2e, 0x45, 0x88, 0x98, 0x91, 0x3b, 0xd0, 0x88, 0xee, 0xc2, 0x7c, 0xcb, 0xb2, 0x8d,
-	0x8e, 0xf5, 0x10, 0x07, 0xea, 0xfd, 0x2b, 0x67, 0x7c, 0x29, 0x25, 0xe3, 0xb6, 0x18, 0x1e, 0xd7,
-	0xef, 0x5c, 0xab, 0xaf, 0x01, 0xfd, 0x50, 0x81, 0xd3, 0x43, 0x34, 0xac, 0x77, 0xb1, 0x6f, 0x98,
-	0x86, 0x6f, 0x54, 0xbe, 0xe0, 0xdc, 0xd7, 0x27, 0x51, 0xf5, 0x0d, 0x01, 0x16, 0x89, 0xf2, 0x94,
-	0xfb, 0xa8, 0x7e, 0xeb, 0x05, 0x98, 0xf1, 0x78, 0xdf, 0xea, 0xa7, 0xd3, 0xb0, 0xd8, 0xe7, 0x01,
-	0xc4, 0x75, 0x6c, 0x82, 0xd3, 0xba, 0xc0, 0x12, 0xe4, 0xb0, 0xe7, 0x39, 0x9e, 0x70, 0x2c, 0x5e,
-	0x40, 0xff, 0x37, 0xe8, 0x18, 0x2f, 0x8c, 0xec, 0x18, 0x5c, 0x90, 0x3e, 0xcf, 0x68, 0x0d, 0xf3,
-	0x8c, 0x97, 0xc6, 0xf3, 0x8c, 0x90, 0x22, 0xe6, 0x1a, 0xef, 0x3d, 0xd6, 0x35, 0x36, 0x27, 0x73,
-	0x8d, 0x90, 0x78, 0x88, 0x6f, 0x1c, 0x3c, 0xda, 0x37, 0xd6, 0x26, 0xf0, 0x8d, 0x90, 0x3a, 0xc9,
-	0x39, 0xac, 0xa1, 0xce, 0xf1, 0xf2, 0x98, 0xce, 0x11, 0xd2, 0xc5, 0xbd, 0xe3, 0xe3, 0x11, 0xbc,
-	0x63, 0xe7, 0x68, 0xbc, 0x23, 0x94, 0xe5, 0x31, 0xee, 0x01, 0xd4, 0x7c, 0x79, 0xe7, 0xea, 0x87,
-	0x0a, 0xcc, 0xc7, 0x82, 0x35, 0x7a, 0x08, 0x27, 0x63, 0x92, 0xf7, 0x7d, 0xc2, 0x32, 0x2b, 0xc5,
-	0xd5, 0x2b, 0xe3, 0x48, 0x2c, 0x7d, 0xc9, 0x4e, 0xb8, 0xc9, 0x0d, 0x55, 0x04, 0xe5, 0xb8, 0x8b,
-	0x54, 0x3f, 0x2d, 0xc2, 0x89, 0x21, 0x40, 0x68, 0x0e, 0xa6, 0x42, 0xdf, 0x9d, 0xb2, 0x4c, 0x64,
-	0x03, 0xf8, 0x9e, 0x61, 0x93, 0x96, 0xe3, 0x75, 0x49, 0x65, 0x8a, 0x09, 0x7b, 0x73, 0x32, 0x61,
-	0x6b, 0x7b, 0x21, 0xe0, 0x96, 0xed, 0x7b, 0x87, 0x9a, 0xc4, 0x80, 0x7c, 0x28, 0xb9, 0x4d, 0xa7,
-	0xd3, 0xc1, 0x2c, 0x62, 0x90, 0x4a, 0x86, 0x31, 0x36, 0x26, 0x64, 0x6c, 0x48, 0x90, 0x9c, 0xb3,
-	0x8f, 0x05, 0x7d, 0x4f, 0x81, 0xa5, 0xfb, 0x96, 0x6d, 0x3a, 0xf7, 0x2d, 0xbb, 0xad, 0x13, 0xdf,
-	0x33, 0x7c, 0xdc, 0xb6, 0x30, 0xa9, 0x64, 0x19, 0xfd, 0x9d, 0x09, 0xe9, 0xef, 0x04, 0xd0, 0xbb,
-	0x21, 0x32, 0x97, 0x62, 0xf1, 0xfe, 0x60, 0x0b, 0xda, 0x87, 0x69, 0x96, 0x6f, 0x90, 0x4a, 0x8e,
-	0xb1, 0xbf, 0x36, 0x21, 0xfb, 0x06, 0x03, 0xe3, 0x84, 0x02, 0x99, 0xaa, 0x19, 0xdb, 0x07, 0x96,
-	0xe7, 0xd8, 0x5d, 0x6c, 0xfb, 0xa4, 0x32, 0x7d, 0x24, 0x6a, 0xde, 0x92, 0x20, 0x85, 0x9a, 0x65,
-	0x16, 0xf4, 0x00, 0x4e, 0x11, 0xdf, 0xf0, 0xb1, 0x3e, 0x24, 0x9d, 0x9b, 0x99, 0x2c, 0x9d, 0x3b,
-	0xc9, 0xc0, 0x93, 0x9a, 0x28, 0x33, 0xcd, 0x5b, 0xbd, 0x61, 0xcc, 0xf9, 0x09, 0x99, 0x19, 0x78,
-	0x52, 0x93, 0xda, 0x81, 0xf9, 0x98, 0xbd, 0xa3, 0x32, 0x64, 0xee, 0xe1, 0x43, 0xe1, 0x64, 0xf4,
-	0x27, 0xda, 0x80, 0xdc, 0x81, 0xd1, 0xe9, 0x61, 0xf6, 0x59, 0x2c, 0xae, 0x3e, 0x97, 0x42, 0x8e,
-	0x46, 0x88, 0xaa, 0xf1, 0xb1, 0x2f, 0x4e, 0x5d, 0x52, 0x54, 0x07, 0x16, 0x06, 0x6c, 0x3d, 0x81,
-	0x6f, 0xb3, 0x9f, 0xaf, 0x96, 0x86, 0x6f, 0x23, 0x84, 0x95, 0x09, 0xdf, 0x85, 0xca, 0x30, 0xeb,
-	0x4e, 0xe0, 0x7d, 0xad, 0x9f, 0xf7, 0x62, 0x0a, 0xde, 0x38, 0xfa, 0xa1, 0xcc, 0xde, 0x84, 0xa2,
-	0x64, 0xdd, 0x09, 0x84, 0x57, 0xfa, 0x09, 0x57, 0x52, 0x10, 0x32, 0xc0, 0x98, 0x4e, 0x07, 0x0c,
-	0xfb, 0x68, 0x74, 0x2a, 0xc1, 0x4a, 0x84, 0xd5, 0x1f, 0x67, 0x60, 0x81, 0xfb, 0xd6, 0x9a, 0xeb,
-	0x76, 0xac, 0x26, 0xdb, 0x4d, 0xa1, 0x67, 0xa0, 0x14, 0xc6, 0xc9, 0x28, 0xbf, 0x2a, 0x86, 0x75,
-	0x75, 0x93, 0xee, 0x5c, 0x2c, 0xdb, 0xed, 0xf9, 0xd2, 0xce, 0x85, 0x95, 0xeb, 0x26, 0xaa, 0xc0,
-	0x0c, 0xee, 0x60, 0xca, 0x54, 0xc9, 0x9c, 0x56, 0x56, 0x4a, 0x5a, 0x50, 0x44, 0x5f, 0x83, 0x05,
-	0xa7, 0xe7, 0xd3, 0x51, 0xf7, 0x0d, 0x1f, 0x7b, 0x5d, 0xc3, 0xbb, 0x17, 0xc4, 0xbd, 0xb4, 0x81,
-	0x7e, 0x40, 0xd8, 0xda, 0x2d, 0x86, 0x78, 0x27, 0x04, 0xe4, 0xd1, 0xa0, 0xec, 0xc4, 0xaa, 0x51,
-	0x03, 0xc0, 0x22, 0xfa, 0xbe, 0xd3, 0xb3, 0x4d, 0x6c, 0x56, 0x72, 0xa7, 0x95, 0x95, 0xb9, 0xd5,
-	0xf3, 0x29, 0x34, 0x57, 0x27, 0xeb, 0x7c, 0x4c, 0x6d, 0xcb, 0xee, 0x75, 0xb5, 0x82, 0x15, 0x94,
-	0x55, 0x1d, 0x96, 0x13, 0xc9, 0x13, 0x56, 0xec, 0x5c, 0xff, 0x8a, 0xa9, 0x35, 0xbe, 0x53, 0xad,
-	0x05, 0x3b, 0xd5, 0xda, 0x5e, 0xb0, 0xd5, 0x95, 0x57, 0xe7, 0xd7, 0x0a, 0x54, 0x36, 0x71, 0xc7,
-	0x38, 0xc4, 0xe6, 0xe0, 0x22, 0xbd, 0x01, 0x45, 0x23, 0x2a, 0x8a, 0xfd, 0xe9, 0xa5, 0x71, 0xd5,
-	0xa8, 0xc9, 0x60, 0xe8, 0x3a, 0x2c, 0x89, 0x2c, 0x1c, 0x9b, 0x3a, 0x0d, 0x38, 0xba, 0x49, 0xc5,
-	0x10, 0xd2, 0x9f, 0x1c, 0x90, 0x7e, 0x53, 0xec, 0xc3, 0x35, 0x14, 0x0e, 0xa3, 0x13, 0x62, 0xb2,
-	0x57, 0x7f, 0x99, 0x85, 0xa5, 0xa4, 0x7d, 0x1f, 0x7a, 0x05, 0x4e, 0x0d, 0x4d, 0x56, 0x22, 0xb3,
-	0x3b, 0x39, 0x24, 0xdf, 0xa8, 0x9b, 0xc8, 0x82, 0x52, 0x93, 0xce, 0x54, 0xf7, 0x9d, 0x7b, 0xd8,
-	0x0e, 0x72, 0x86, 0xed, 0x09, 0xf6, 0xa2, 0xb5, 0x0d, 0x3a, 0x6a, 0x8f, 0xc2, 0x69, 0xc5, 0x66,
-	0xf8, 0x9b, 0xa8, 0x7f, 0x9b, 0x02, 0x88, 0xda, 0xd0, 0x3b, 0x00, 0x3d, 0x82, 0x3d, 0x9d, 0x7d,
-	0x06, 0x84, 0xee, 0x1b, 0x47, 0xc3, 0x5b, 0xbb, 0x4d, 0xb0, 0xb7, 0x4b, 0x71, 0xaf, 0x1d, 0xd3,
-	0x0a, 0xbd, 0xa0, 0x40, 0x29, 0x89, 0x65, 0x62, 0x9d, 0xb9, 0x99, 0x58, 0x89, 0xa3, 0xa2, 0xdc,
-	0xb5, 0x4c, 0x5c, 0xa7, 0xb8, 0x94, 0x92, 0x04, 0x05, 0xba, 0x85, 0x62, 0x9a, 0xad, 0x00, 0xf3,
-	0x63, 0x5e, 0x50, 0x8b, 0x50, 0x08, 0x45, 0x54, 0x35, 0x28, 0x84, 0x83, 0xd3, 0xc4, 0x8d, 0x2a,
-	0xcc, 0x46, 0xb3, 0x88, 0x82, 0x47, 0x31, 0x24, 0xad, 0x9b, 0xeb, 0xd3, 0x90, 0xf5, 0x0f, 0x5d,
-	0x5c, 0xfd, 0x24, 0x0b, 0xcb, 0x89, 0xdb, 0x22, 0x74, 0x0d, 0x66, 0xc4, 0x51, 0x8e, 0xd0, 0x7d,
-	0x2d, 0xa5, 0x22, 0x6e, 0xf0, 0x51, 0x5a, 0x30, 0x9c, 0xee, 0xdb, 0x3c, 0x4c, 0x2c, 0xb3, 0x67,
-	0x74, 0x74, 0xcf, 0x71, 0xfc, 0xc0, 0x88, 0x5e, 0x49, 0x09, 0x38, 0xcc, 0x3d, 0xb5, 0xd9, 0x00,
-	0x56, 0xa3, 0xa8, 0xe8, 0x2b, 0x50, 0xee, 0x3a, 0xb6, 0xe5, 0x3b, 0x1e, 0x4d, 0xfb, 0x2c, 0xbb,
-	0xe5, 0x04, 0x09, 0x67, 0x9a, 0x18, 0x74, 0x23, 0x1c, 0x5a, 0xb7, 0x5b, 0x8e, 0x36, 0xdf, 0xed,
-	0x2b, 0x13, 0x74, 0x01, 0x96, 0xa9, 0xe3, 0x59, 0x1e, 0x26, 0xba, 0xd8, 0xcc, 0xf0, 0xa8, 0x90,
-	0x3d, 0xad, 0xac, 0xe4, 0xb5, 0xa5, 0xa0, 0x71, 0x5b, 0x6a, 0x43, 0x87, 0x20, 0xe1, 0xe8, 0x6c,
-	0x53, 0x93, 0x1b, 0x3f, 0x37, 0x0b, 0xd6, 0x46, 0x92, 0x75, 0xd3, 0xf0, 0x0d, 0x1e, 0x8d, 0xe7,
-	0xba, 0x7d, 0x95, 0xea, 0x1a, 0x2c, 0x26, 0x74, 0x4b, 0x88, 0x9b, 0x4b, 0x72, 0xdc, 0x2c, 0xc9,
-	0xb1, 0x71, 0x0b, 0x4e, 0x3d, 0xea, 0x70, 0x27, 0xe5, 0x29, 0x41, 0x75, 0x0f, 0x9e, 0x4d, 0x73,
-	0x70, 0x81, 0xfe, 0x0b, 0x50, 0x6c, 0xfd, 0x38, 0x64, 0x66, 0xa5, 0xa0, 0x95, 0xfb, 0x97, 0xa3,
-	0x6e, 0x56, 0x3f, 0x5e, 0x84, 0x19, 0x61, 0x6a, 0xc8, 0x80, 0xa2, 0x2b, 0xed, 0x6b, 0x94, 0x91,
-	0xcc, 0x4b, 0x80, 0xd4, 0x1a, 0x7e, 0x6c, 0x23, 0x23, 0x63, 0xaa, 0xbf, 0x2f, 0x02, 0x44, 0x49,
-	0x1a, 0x7a, 0x08, 0xc1, 0x06, 0x1a, 0x9b, 0xba, 0xf8, 0xf6, 0x06, 0x8e, 0x72, 0x7d, 0x54, 0xe2,
-	0x10, 0x36, 0x58, 0x6e, 0x6c, 0x6e, 0x09, 0x48, 0x6d, 0xc1, 0x8d, 0x57, 0xa1, 0x77, 0x60, 0xde,
-	0x68, 0xfa, 0xd6, 0x01, 0x8e, 0x88, 0x79, 0xa8, 0xba, 0x36, 0x3e, 0xf1, 0x1a, 0x03, 0x0c, 0x59,
-	0xe7, 0x8c, 0xbe, 0x32, 0xb2, 0x00, 0xa4, 0x74, 0x82, 0x3b, 0x55, 0x7d, 0x7c, 0xb6, 0x78, 0x26,
-	0x21, 0x81, 0xa3, 0xab, 0x90, 0xa5, 0x01, 0x59, 0xe4, 0x2c, 0x17, 0x46, 0x24, 0xa1, 0x51, 0x53,
-	0x63, 0x00, 0xea, 0x1f, 0x33, 0x90, 0xbf, 0x81, 0x0d, 0xd2, 0xf3, 0xb0, 0x89, 0xbe, 0xaf, 0xc0,
-	0x12, 0x8f, 0x87, 0x42, 0x67, 0x7a, 0xd3, 0xe9, 0xf1, 0x25, 0xa3, 0x34, 0x6f, 0x8c, 0x3f, 0x97,
-	0x80, 0xa2, 0xc6, 0x42, 0xab, 0xd0, 0xd8, 0x06, 0x03, 0xe7, 0x93, 0x43, 0xd6, 0x40, 0x03, 0xfa,
-	0x48, 0x81, 0x65, 0x91, 0xa6, 0xc5, 0xe4, 0xe1, 0xa1, 0xf1, 0xcd, 0x23, 0x90, 0x87, 0xe7, 0x4d,
-	0x09, 0x02, 0x2d, 0x3a, 0x83, 0x2d, 0x68, 0x05, 0xca, 0xbe, 0xe3, 0x1b, 0x1d, 0x9e, 0x8a, 0x10,
-	0x37, 0x48, 0x2d, 0x15, 0x6d, 0x8e, 0xd5, 0xd3, 0x5c, 0x63, 0x97, 0xd6, 0xaa, 0x5b, 0x70, 0x62,
-	0xc8, 0x54, 0x1f, 0x17, 0x5c, 0x32, 0x72, 0x1e, 0xbe, 0x0d, 0x95, 0x61, 0x12, 0x8e, 0x84, 0x43,
-	0x60, 0x61, 0xc0, 0x6b, 0xd0, 0xdb, 0x90, 0xef, 0x0a, 0x3d, 0x08, 0xa7, 0x5c, 0x9f, 0x5c, 0xa3,
-	0x5a, 0x88, 0xa9, 0x7e, 0x94, 0x81, 0xb9, 0x7e, 0x97, 0x79, 0xd2, 0x94, 0xe8, 0x39, 0x40, 0x2d,
-	0xcf, 0xe0, 0x91, 0xd6, 0xc3, 0x5d, 0xc3, 0xb2, 0x2d, 0xbb, 0xcd, 0xd4, 0xa1, 0x68, 0x0b, 0x41,
-	0x8b, 0x16, 0x34, 0xa0, 0x9f, 0x28, 0x70, 0xb2, 0xdf, 0xc2, 0x88, 0x34, 0x8c, 0x7b, 0x30, 0x3e,
-	0xaa, 0x78, 0xd1, 0x6f, 0x6b, 0x24, 0x94, 0x82, 0xdb, 0xdb, 0x09, 0x27, 0xb9, 0x55, 0x7d, 0x0d,
-	0x4e, 0x3d, 0x6a, 0xe0, 0x48, 0x66, 0xf0, 0x32, 0xcc, 0x3f, 0x7e, 0x8b, 0x30, 0x7c, 0xf8, 0xef,
-	0x72, 0x90, 0xa5, 0xb1, 0x03, 0xe9, 0x50, 0xe4, 0x79, 0x8b, 0x6e, 0x1b, 0xdd, 0x20, 0xed, 0xbc,
-	0x32, 0x46, 0x14, 0x12, 0x85, 0x9b, 0x46, 0x17, 0x6b, 0xd0, 0x0d, 0x7f, 0x23, 0x0c, 0x25, 0xe6,
-	0xea, 0xd8, 0xe3, 0xf9, 0x80, 0x38, 0xc3, 0x7e, 0x65, 0x1c, 0x8a, 0x0d, 0x0e, 0x44, 0xbf, 0xee,
-	0xd7, 0x8e, 0x69, 0xc5, 0x66, 0x54, 0x44, 0x3e, 0x2c, 0x98, 0x16, 0xf1, 0x3d, 0x6b, 0x9f, 0x0d,
-	0xe5, 0x5c, 0x23, 0x1e, 0x5f, 0xf7, 0x71, 0x6d, 0x4a, 0x68, 0x82, 0xb0, 0x6c, 0xc6, 0xea, 0x90,
-	0x0e, 0xd0, 0x36, 0x7a, 0x6d, 0xcc, 0xe9, 0x3e, 0x1f, 0xed, 0xf0, 0xb8, 0x8f, 0xee, 0x2a, 0x85,
-	0x11, 0x3c, 0x85, 0x76, 0x50, 0x50, 0xaf, 0x00, 0x44, 0x7a, 0x45, 0xa7, 0xa0, 0x40, 0x57, 0x89,
-	0xb8, 0x46, 0x13, 0x8b, 0x2c, 0x37, 0xaa, 0x40, 0x08, 0xb2, 0x6c, 0x0d, 0x33, 0xac, 0x81, 0xfd,
-	0x56, 0xff, 0x0d, 0x8a, 0x92, 0xd2, 0x22, 0x83, 0x50, 0x24, 0x83, 0x50, 0xdf, 0x86, 0x72, 0x7c,
-	0xb6, 0xb4, 0x27, 0x53, 0x6f, 0xd0, 0x93, 0x15, 0xa8, 0x89, 0x91, 0x5e, 0x57, 0x98, 0x13, 0xfd,
-	0x49, 0x6b, 0xba, 0x96, 0xcd, 0x38, 0x33, 0x1a, 0xfd, 0xc9, 0x6a, 0x8c, 0x07, 0x2c, 0x4d, 0xa4,
-	0x35, 0xc6, 0x03, 0xf5, 0x4d, 0x28, 0x84, 0xd3, 0x4b, 0x16, 0x01, 0x5d, 0x82, 0x42, 0x78, 0x33,
-	0x9b, 0x62, 0x43, 0x1b, 0x75, 0xa6, 0x99, 0x3d, 0x55, 0xbe, 0x7a, 0x08, 0xe5, 0x78, 0x46, 0x93,
-	0xe0, 0x11, 0xb7, 0xfa, 0x37, 0xcd, 0x97, 0xc7, 0x8e, 0x08, 0x72, 0xde, 0xf8, 0x45, 0x06, 0x9e,
-	0x7a, 0xe4, 0xd5, 0xc7, 0x11, 0x6e, 0x2e, 0x9e, 0x6c, 0xd2, 0xff, 0x16, 0xcc, 0xba, 0x9e, 0xd5,
-	0x35, 0xbc, 0x43, 0xb1, 0x73, 0xe1, 0x59, 0xc9, 0xf8, 0x47, 0x00, 0x25, 0x01, 0xc7, 0x77, 0x2c,
-	0xef, 0x2b, 0xc3, 0xf6, 0x07, 0xaf, 0x1f, 0xc5, 0x0d, 0xd3, 0x97, 0xb5, 0x4f, 0xf8, 0x6c, 0x0a,
-	0xce, 0xa4, 0xba, 0x7c, 0x41, 0x1f, 0xf4, 0x4f, 0x98, 0x2e, 0x97, 0xc8, 0xc0, 0xbe, 0x7a, 0x94,
-	0x97, 0x3c, 0xb1, 0x75, 0x1d, 0x98, 0x38, 0xad, 0x54, 0x7d, 0x79, 0xe2, 0x61, 0xb7, 0x84, 0x89,
-	0x5f, 0xed, 0xf7, 0x91, 0x31, 0xec, 0x4a, 0xd2, 0xd5, 0xfb, 0x59, 0x38, 0x39, 0xf4, 0xd2, 0x3a,
-	0xed, 0xbd, 0xeb, 0x43, 0x98, 0x33, 0x31, 0xb1, 0x3c, 0x6c, 0xf2, 0x9b, 0xc1, 0xc0, 0xe4, 0x77,
-	0x27, 0xbd, 0x35, 0xaf, 0x6d, 0x72, 0x58, 0x56, 0x27, 0xd2, 0xc5, 0x59, 0x53, 0xae, 0x53, 0x7f,
-	0xa1, 0x40, 0x49, 0xee, 0x85, 0x56, 0x61, 0x39, 0x4c, 0x4c, 0x9c, 0x96, 0x48, 0x32, 0x4c, 0xcc,
-	0x9f, 0x73, 0x28, 0xda, 0x62, 0xd0, 0x78, 0xab, 0xa5, 0x05, 0x4d, 0xe8, 0x1c, 0x2c, 0x19, 0x9d,
-	0x8e, 0x73, 0x3f, 0x98, 0x80, 0xce, 0x9f, 0xb1, 0xb0, 0x69, 0x64, 0x34, 0x24, 0xda, 0x18, 0x7e,
-	0x83, 0xb5, 0xa0, 0x4b, 0x50, 0xc1, 0xc4, 0xb7, 0xba, 0x86, 0x8f, 0x4d, 0xbd, 0x2f, 0x93, 0x27,
-	0x22, 0xfc, 0x1e, 0x0f, 0xdb, 0xe5, 0xf4, 0x94, 0xa8, 0x1f, 0x29, 0x80, 0x06, 0xa7, 0x95, 0xb0,
-	0xce, 0xcd, 0xfe, 0x75, 0xbe, 0x71, 0xa4, 0xca, 0x94, 0x6d, 0xe0, 0xef, 0x19, 0x50, 0x87, 0x5f,
-	0xce, 0x0e, 0x06, 0x1d, 0xe5, 0x48, 0x83, 0xce, 0x97, 0x75, 0x1c, 0xd3, 0x83, 0xb9, 0xe6, 0x5d,
-	0xc3, 0xb6, 0x71, 0xa7, 0xdf, 0x48, 0x6f, 0x4e, 0x7c, 0x7d, 0x5d, 0xdb, 0xe0, 0xb8, 0xbc, 0x72,
-	0xb6, 0x29, 0x95, 0x88, 0xfa, 0x23, 0x05, 0x4a, 0x72, 0x7b, 0x9a, 0x13, 0xb3, 0x73, 0xb0, 0xd4,
-	0x31, 0x88, 0xaf, 0x07, 0x6a, 0x0f, 0xce, 0xd6, 0xb9, 0x61, 0x21, 0xda, 0xd6, 0xe0, 0x4d, 0xc2,
-	0xaa, 0xd0, 0x45, 0x38, 0xde, 0xb2, 0x3c, 0xe2, 0xeb, 0xa1, 0x2a, 0xe5, 0xf3, 0xf8, 0x8c, 0xb6,
-	0xc4, 0x5a, 0x35, 0xd1, 0x28, 0x46, 0x55, 0xaf, 0xc0, 0x72, 0xe2, 0xf3, 0x91, 0xb4, 0x27, 0x29,
-	0x15, 0x38, 0x9e, 0x7c, 0xc3, 0x5e, 0xfd, 0x6d, 0x06, 0xf2, 0xe1, 0x56, 0xe4, 0x1a, 0x4f, 0x01,
-	0x84, 0xdd, 0x5c, 0x4c, 0xa9, 0xef, 0x30, 0x99, 0xa7, 0x1f, 0x01, 0x8d, 0x21, 0xa0, 0xeb, 0x90,
-	0x63, 0x77, 0x61, 0xc2, 0x44, 0xfe, 0x7b, 0x54, 0x28, 0x9a, 0xa1, 0x78, 0x1a, 0xc7, 0x50, 0xbf,
-	0x0e, 0x59, 0x96, 0xf1, 0xa4, 0x0c, 0x72, 0xf1, 0x75, 0x9b, 0x1a, 0x5c, 0x37, 0x24, 0x26, 0xca,
-	0xef, 0x40, 0xb8, 0xc8, 0x27, 0x60, 0xc6, 0x22, 0x3a, 0x5d, 0x32, 0x71, 0x32, 0x37, 0x6d, 0x91,
-	0x1d, 0x83, 0xf8, 0xea, 0xcf, 0x15, 0xc8, 0x31, 0x79, 0x8e, 0x50, 0x80, 0x7f, 0x87, 0x79, 0x7e,
-	0x11, 0xd9, 0x32, 0xba, 0x56, 0xe7, 0x90, 0xf6, 0xe2, 0xd9, 0xe6, 0x2c, 0xab, 0xde, 0x66, 0xb5,
-	0x75, 0x13, 0x1d, 0x87, 0x69, 0x56, 0x41, 0x98, 0x4c, 0x25, 0x4d, 0x94, 0x64, 0x61, 0x73, 0xb2,
-	0xb0, 0xd5, 0x1f, 0x64, 0xa0, 0xc4, 0x4e, 0x7f, 0x03, 0x0b, 0x89, 0xdf, 0xe4, 0x0f, 0xce, 0x61,
-	0x2a, 0x69, 0x0e, 0x3b, 0x50, 0xe0, 0x77, 0xb4, 0x34, 0xd6, 0x65, 0x58, 0x5c, 0x3b, 0x9b, 0x72,
-	0x11, 0x19, 0xfd, 0x75, 0x7c, 0xa8, 0xe5, 0x89, 0xf8, 0x85, 0xae, 0x43, 0xa6, 0x8d, 0xfd, 0x51,
-	0x5f, 0xbb, 0x31, 0xa0, 0xab, 0x58, 0x7a, 0x99, 0x45, 0x51, 0xd0, 0x1e, 0x4c, 0x1b, 0xae, 0x8b,
-	0x6d, 0x33, 0xd8, 0x02, 0x5d, 0x1e, 0x05, 0x6f, 0x8d, 0x0d, 0x8d, 0x20, 0x05, 0x16, 0xfa, 0x5f,
-	0xc8, 0x35, 0x3b, 0xd8, 0xf0, 0x82, 0xbd, 0xce, 0xa5, 0x51, 0x40, 0x37, 0xe8, 0xc8, 0x08, 0x93,
-	0x23, 0xc9, 0xef, 0xa5, 0x7e, 0x35, 0x05, 0xb3, 0x62, 0x59, 0x44, 0xb0, 0x8e, 0xaf, 0x4b, 0xf2,
-	0x93, 0xa8, 0x9d, 0x3e, 0xc5, 0xbd, 0x30, 0xb2, 0xe2, 0xc2, 0xb7, 0x2b, 0x4c, 0x73, 0xb7, 0xe3,
-	0x9a, 0x7b, 0x71, 0x1c, 0xcd, 0x85, 0x98, 0x81, 0xea, 0xb4, 0x98, 0xea, 0x2e, 0x8f, 0xa1, 0xba,
-	0x10, 0x54, 0xe8, 0x4e, 0x7e, 0x4c, 0xf3, 0x9b, 0x3c, 0xe4, 0x03, 0xa3, 0x42, 0x0d, 0x98, 0xe6,
-	0xef, 0x59, 0xc5, 0x06, 0xe0, 0xf9, 0x11, 0xad, 0xb2, 0xa6, 0xb1, 0xd1, 0x54, 0x7c, 0x8e, 0x83,
-	0x08, 0x2c, 0x76, 0x7b, 0x1d, 0x9a, 0x02, 0xb8, 0xfa, 0xc0, 0x2d, 0xce, 0xda, 0xa8, 0xf0, 0x37,
-	0x04, 0x94, 0x7c, 0x6d, 0xb3, 0xd0, 0x8d, 0x57, 0x22, 0x13, 0xe6, 0xf6, 0x8d, 0xb6, 0x2e, 0x5d,
-	0x54, 0x65, 0x46, 0x7a, 0x92, 0x16, 0xf2, 0xad, 0x1b, 0x6d, 0xf9, 0x52, 0xaa, 0xb4, 0x2f, 0x95,
-	0xe9, 0xd4, 0x2c, 0x1f, 0x7b, 0xc6, 0x7e, 0x07, 0xcb, 0x53, 0xcb, 0x8e, 0x37, 0xb5, 0xba, 0x80,
-	0xea, 0x9b, 0x9a, 0x15, 0xaf, 0x44, 0xdf, 0x54, 0xa0, 0x12, 0x2a, 0xf4, 0x1e, 0x3e, 0x24, 0x32,
-	0x75, 0x8e, 0x51, 0x6f, 0x8d, 0xab, 0xd5, 0xeb, 0xf8, 0x90, 0xc8, 0xf4, 0xcb, 0xdd, 0xa4, 0x06,
-	0x55, 0x85, 0x69, 0xbe, 0xcc, 0x72, 0xb6, 0x56, 0x62, 0xd9, 0x9a, 0xea, 0xc1, 0xc2, 0xc0, 0x44,
-	0x8e, 0xe8, 0x76, 0x8c, 0x86, 0x6b, 0xfe, 0x94, 0x47, 0x7c, 0x59, 0x44, 0x49, 0xfd, 0xb6, 0x02,
-	0x0b, 0x03, 0x86, 0xf1, 0x84, 0x49, 0x83, 0xa9, 0x67, 0xa3, 0xa9, 0x1f, 0xc0, 0x72, 0xa2, 0x22,
-	0x9f, 0xf4, 0xf4, 0xdf, 0x83, 0x92, 0x6c, 0xa6, 0x29, 0xe9, 0x22, 0xdf, 0x90, 0xe8, 0xc2, 0x3b,
-	0xd7, 0x51, 0x26, 0x1e, 0xde, 0x5a, 0xbe, 0x0a, 0xf3, 0xb1, 0x8f, 0x0a, 0x7a, 0x0e, 0x50, 0xd3,
-	0xb1, 0x7d, 0xcb, 0xee, 0xb1, 0x64, 0x94, 0x5f, 0x57, 0x0b, 0x7b, 0x59, 0x90, 0x5b, 0xd8, 0x6d,
-	0x6c, 0xf5, 0x36, 0x94, 0xe3, 0xd1, 0x75, 0x44, 0x88, 0x30, 0xf9, 0x98, 0x8a, 0x92, 0x8f, 0xea,
-	0x0a, 0xa0, 0xc1, 0xaf, 0x53, 0xd8, 0x53, 0x91, 0x7a, 0x2e, 0xc3, 0x62, 0x42, 0x34, 0xae, 0x2e,
-	0xc2, 0xc2, 0xc0, 0x97, 0xa8, 0xba, 0x24, 0x50, 0xfb, 0x62, 0x6c, 0xf5, 0xa7, 0x59, 0xc8, 0xef,
-	0x38, 0xe2, 0xa8, 0xf4, 0xff, 0x21, 0x4f, 0xf0, 0x01, 0xf6, 0x2c, 0x9f, 0x3b, 0xc9, 0x5c, 0xea,
-	0x53, 0xb7, 0x00, 0xa2, 0xb6, 0x2b, 0xc6, 0xf3, 0x37, 0x18, 0x21, 0xdc, 0xf8, 0x47, 0x51, 0xa8,
-	0x02, 0x33, 0x5d, 0x4c, 0x88, 0xd1, 0x0e, 0xce, 0xe0, 0x82, 0x22, 0xbb, 0xf5, 0xf6, 0x8c, 0x26,
-	0x66, 0x8b, 0x5b, 0xd0, 0x78, 0x21, 0x21, 0xa7, 0xc9, 0xa5, 0xc9, 0xcb, 0xa6, 0x07, 0xcd, 0xee,
-	0x19, 0x28, 0x75, 0x9c, 0xb6, 0xde, 0x71, 0xc4, 0xcb, 0x8d, 0x19, 0xde, 0xa5, 0xe3, 0xb4, 0x77,
-	0x44, 0x15, 0x4b, 0xc9, 0xee, 0x7a, 0xd8, 0x30, 0xd9, 0x6b, 0xb1, 0x82, 0x26, 0x4a, 0xea, 0xeb,
-	0x90, 0xdd, 0xb1, 0x88, 0x8f, 0x1a, 0x40, 0xbb, 0xeb, 0xd8, 0xf6, 0x3d, 0x0b, 0x07, 0x7b, 0xb0,
-	0xb3, 0x23, 0x2a, 0x55, 0x83, 0x0e, 0xff, 0x65, 0x61, 0xa2, 0x7a, 0x90, 0x0f, 0x74, 0x5c, 0x6d,
-	0x41, 0x96, 0xaa, 0x19, 0xcd, 0x43, 0xf1, 0xf6, 0xcd, 0xdd, 0xc6, 0xd6, 0x46, 0x7d, 0xbb, 0xbe,
-	0xb5, 0x59, 0x3e, 0x86, 0x0a, 0x90, 0xdb, 0xd3, 0xd6, 0x36, 0xb6, 0xca, 0x0a, 0xfd, 0xb9, 0xb9,
-	0xb5, 0x7e, 0xfb, 0x6a, 0x79, 0x0a, 0xe5, 0x21, 0x5b, 0xbf, 0xb9, 0x7d, 0xab, 0x9c, 0x41, 0x00,
-	0xd3, 0x37, 0x6f, 0xed, 0xd5, 0x37, 0xb6, 0xca, 0x59, 0x5a, 0x7b, 0x67, 0x4d, 0xbb, 0x59, 0xce,
-	0xd1, 0xae, 0x5b, 0x9a, 0x76, 0x4b, 0x2b, 0x4f, 0xa3, 0x12, 0xe4, 0x37, 0xb4, 0xfa, 0x5e, 0x7d,
-	0x63, 0x6d, 0xa7, 0x3c, 0x53, 0x2d, 0x01, 0xec, 0x38, 0xed, 0x0d, 0xc7, 0xf6, 0x3d, 0xa7, 0x53,
-	0xfd, 0x43, 0x96, 0x59, 0x92, 0xe7, 0xdf, 0x71, 0xbc, 0x7b, 0xd1, 0x8b, 0xd6, 0x7f, 0x81, 0xc2,
-	0x7d, 0x56, 0x11, 0x39, 0x71, 0x9e, 0x57, 0xd4, 0x4d, 0xb4, 0x0f, 0xe5, 0x26, 0x1f, 0xae, 0x07,
-	0x7f, 0x37, 0x11, 0x56, 0x30, 0xf6, 0xfb, 0xba, 0x79, 0x01, 0xb8, 0x25, 0xf0, 0x28, 0x47, 0xc7,
-	0x69, 0xb7, 0x2d, 0xbb, 0x1d, 0x71, 0x64, 0x26, 0xe4, 0x10, 0x80, 0x21, 0x87, 0x09, 0x0b, 0x86,
-	0xe7, 0x5b, 0x2d, 0xa3, 0xe9, 0x47, 0x24, 0xd9, 0xc9, 0x48, 0xca, 0x01, 0x62, 0xc8, 0xd2, 0x62,
-	0xf7, 0xc2, 0x07, 0x16, 0xa1, 0x06, 0x1c, 0xd2, 0xe4, 0x26, 0xa3, 0x59, 0x08, 0x21, 0x43, 0x9e,
-	0xb7, 0x60, 0xda, 0x35, 0x3c, 0xa3, 0x4b, 0x2a, 0xc0, 0x0c, 0x73, 0x84, 0x2f, 0x71, 0x6c, 0xf5,
-	0x6b, 0x0d, 0x86, 0x23, 0x1e, 0x94, 0x72, 0x50, 0xf5, 0x32, 0x14, 0xa5, 0xea, 0xc7, 0x1d, 0x06,
-	0x16, 0xe4, 0xc3, 0x8d, 0xff, 0x64, 0x81, 0x2d, 0x22, 0x11, 0xc1, 0x35, 0xcc, 0x8b, 0x15, 0x29,
-	0x2f, 0xae, 0x9e, 0xa3, 0xe1, 0xce, 0x71, 0xd3, 0x9b, 0x63, 0xf5, 0x3f, 0xa8, 0x05, 0x47, 0x23,
-	0x1e, 0x89, 0x7e, 0x06, 0x16, 0x79, 0x3f, 0x1a, 0x3d, 0x7b, 0x64, 0xc8, 0x56, 0xaa, 0xfa, 0x16,
-	0x2c, 0xf5, 0x77, 0x1b, 0x29, 0xb5, 0x7f, 0x1a, 0x8a, 0x84, 0x8d, 0xe3, 0xa7, 0x99, 0x3c, 0xd0,
-	0x01, 0xaf, 0xaa, 0xdb, 0x2d, 0x67, 0xf5, 0x63, 0x05, 0x66, 0xd7, 0xb1, 0xd1, 0xdd, 0xb6, 0x85,
-	0x1b, 0xa2, 0xef, 0x28, 0x30, 0x13, 0xfc, 0x4e, 0x9b, 0xba, 0x27, 0xfc, 0x4b, 0x43, 0xbd, 0x3c,
-	0xce, 0x58, 0xfe, 0x49, 0x39, 0xb6, 0xa2, 0x9c, 0x53, 0x56, 0xdf, 0x05, 0xe0, 0x92, 0xb1, 0x7d,
-	0xb9, 0x2d, 0xf6, 0xe7, 0x67, 0x47, 0xdc, 0xe5, 0xab, 0xa3, 0x0e, 0x10, 0xec, 0xdf, 0x55, 0xa0,
-	0xc8, 0xe9, 0x79, 0x3a, 0xf1, 0x00, 0x72, 0xfc, 0xc7, 0x85, 0x51, 0xd2, 0x4a, 0x31, 0x23, 0xf5,
-	0xe2, 0x68, 0x83, 0xc4, 0x47, 0x94, 0x4b, 0xf2, 0x41, 0xb8, 0x44, 0x3b, 0x3c, 0x6a, 0xa0, 0x07,
-	0x30, 0x13, 0xfc, 0xbc, 0x38, 0xea, 0x87, 0x94, 0x7e, 0x3e, 0xd4, 0xf3, 0xe9, 0x47, 0x05, 0xd1,
-	0x99, 0xcb, 0xf2, 0xd9, 0x14, 0x54, 0xb8, 0x2c, 0x5b, 0x0f, 0x7c, 0xec, 0xd9, 0x46, 0x87, 0x1b,
-	0x67, 0xc3, 0xe1, 0x96, 0x53, 0x94, 0xbc, 0x0b, 0x5d, 0x1e, 0xdb, 0xed, 0xd5, 0x17, 0xc7, 0x19,
-	0x1a, 0x68, 0x0d, 0x7d, 0x4b, 0x01, 0x88, 0xfc, 0x10, 0xa5, 0xdf, 0x65, 0xc7, 0x9c, 0x5d, 0xbd,
-	0x3c, 0xc6, 0xc8, 0x40, 0x8a, 0xd5, 0x9f, 0x29, 0x80, 0xb8, 0xae, 0x64, 0x07, 0x46, 0x1f, 0x2a,
-	0x50, 0xea, 0xab, 0x48, 0xfb, 0xd7, 0xb9, 0xa4, 0x30, 0x90, 0x5a, 0x51, 0x09, 0xa1, 0x86, 0x2f,
-	0xe9, 0xfa, 0x1a, 0x3c, 0x3b, 0x0c, 0x42, 0x46, 0x58, 0x2f, 0xf0, 0xb9, 0xac, 0xb9, 0xd6, 0x1b,
-	0x73, 0x52, 0x93, 0x7e, 0x70, 0x7e, 0x7f, 0x9a, 0x65, 0x5a, 0x17, 0xfe, 0x19, 0x00, 0x00, 0xff,
-	0xff, 0x32, 0xe2, 0xb3, 0xb3, 0x44, 0x3a, 0x00, 0x00,
+	// 2983 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x5a, 0x4d, 0x90, 0x1b, 0x47,
+	0xf5, 0xf7, 0xe8, 0x6b, 0xa5, 0x27, 0xed, 0xae, 0xb6, 0x77, 0xd7, 0x96, 0xe7, 0x9f, 0xfc, 0xe3,
+	0x4c, 0xc5, 0xd4, 0x16, 0x10, 0xd9, 0xde, 0x98, 0xc4, 0x0e, 0x21, 0xc9, 0x7e, 0xc8, 0x8e, 0xec,
+	0xb5, 0x2d, 0x66, 0xd7, 0x38, 0xa4, 0x48, 0x0d, 0xb3, 0x9a, 0x96, 0x3c, 0xe5, 0xd1, 0xcc, 0xa4,
+	0x7b, 0xb4, 0xf6, 0x86, 0x40, 0x48, 0x41, 0x41, 0x85, 0xaa, 0x14, 0x17, 0xaa, 0x08, 0x50, 0xb9,
+	0x50, 0x5c, 0xc2, 0x81, 0x23, 0x07, 0xae, 0x1c, 0x38, 0x71, 0xcd, 0x9d, 0x1b, 0x9f, 0x07, 0x2e,
+	0x5c, 0xa1, 0xfa, 0x63, 0x46, 0x23, 0x69, 0xe4, 0x8c, 0xa4, 0x4d, 0x6e, 0xea, 0x8f, 0xf7, 0xfb,
+	0xbd, 0x7e, 0xfd, 0x5e, 0xf7, 0x7b, 0x3d, 0x82, 0x95, 0x43, 0x6c, 0xf6, 0x8c, 0x8e, 0x6b, 0x98,
+	0xbe, 0x5d, 0xf7, 0x89, 0x17, 0x78, 0xe8, 0xbc, 0x47, 0xba, 0x75, 0xd3, 0x37, 0xdb, 0xf7, 0x71,
+	0x9d, 0x8d, 0xd6, 0x7b, 0x9e, 0x85, 0x9d, 0x7a, 0xc7, 0x35, 0xf0, 0x23, 0xdc, 0xee, 0x07, 0xb6,
+	0xe7, 0xd6, 0x8f, 0x2e, 0xa9, 0xeb, 0x5c, 0x92, 0xf4, 0x5d, 0x17, 0x93, 0x81, 0xb4, 0xba, 0x8c,
+	0x5d, 0xcb, 0xf7, 0x6c, 0x37, 0xa0, 0xb2, 0xe3, 0x5c, 0xd7, 0xf3, 0xba, 0x0e, 0xbe, 0xc0, 0x5b,
+	0x87, 0xfd, 0xce, 0x05, 0x0b, 0xd3, 0x36, 0xb1, 0xfd, 0xc0, 0x23, 0x72, 0xc6, 0x53, 0xa3, 0x33,
+	0x02, 0xbb, 0x87, 0x69, 0x60, 0xf6, 0x7c, 0x39, 0xe1, 0xff, 0xc7, 0x20, 0xfa, 0xc4, 0xe4, 0x7a,
+	0x4c, 0x18, 0x7f, 0x48, 0x4c, 0xdf, 0xc7, 0x24, 0x54, 0x61, 0xb1, 0x87, 0x03, 0x62, 0xb7, 0x65,
+	0x53, 0xfb, 0xa5, 0x02, 0x4b, 0x3a, 0xee, 0x79, 0x01, 0xbe, 0x4e, 0xfc, 0x76, 0xcb, 0x23, 0x01,
+	0xea, 0xc1, 0x69, 0xd3, 0xb7, 0x0d, 0x8a, 0xc9, 0x91, 0xdd, 0xc6, 0xc6, 0x40, 0xc5, 0x9a, 0x72,
+	0x4e, 0xd9, 0x28, 0x6f, 0xbe, 0x50, 0x4f, 0x36, 0x8a, 0x6f, 0xfb, 0xd8, 0xb1, 0x5d, 0x5c, 0x3f,
+	0xba, 0x54, 0xdf, 0xf2, 0xed, 0x7d, 0x21, 0xbf, 0x1b, 0x89, 0xeb, 0x6b, 0x66, 0x42, 0x2f, 0x3a,
+	0x0b, 0xc5, 0xb6, 0x67, 0x61, 0x62, 0xd8, 0x56, 0x2d, 0x73, 0x4e, 0xd9, 0x28, 0xe9, 0x0b, 0xbc,
+	0xdd, 0xb4, 0xb4, 0xff, 0xe6, 0x01, 0x35, 0x5d, 0x1a, 0x90, 0x7e, 0x9b, 0xad, 0x50, 0xc7, 0x6f,
+	0xf5, 0x31, 0x0d, 0xd0, 0x79, 0x58, 0xb2, 0x07, 0xbd, 0x4c, 0x4e, 0xe1, 0x72, 0x8b, 0xb1, 0xde,
+	0xa6, 0x85, 0xee, 0x42, 0x91, 0xe0, 0xae, 0x4d, 0x03, 0x4c, 0x6a, 0x7f, 0x5d, 0xe0, 0xaa, 0x3f,
+	0x5f, 0x4f, 0xb5, 0x9f, 0x75, 0x5d, 0xca, 0x49, 0xc6, 0xd7, 0x4e, 0xe9, 0x11, 0x14, 0xc2, 0xb0,
+	0xe4, 0x13, 0xaf, 0x8d, 0x29, 0x35, 0x0e, 0xfb, 0xae, 0xe5, 0xe0, 0xda, 0xdf, 0x04, 0xf8, 0x57,
+	0x53, 0x82, 0xb7, 0x84, 0xf4, 0x36, 0x17, 0x1e, 0x30, 0x2c, 0xfa, 0xf1, 0x7e, 0xf4, 0x3d, 0x38,
+	0x33, 0x4c, 0x63, 0xf8, 0xc4, 0xeb, 0x12, 0x4c, 0x69, 0xed, 0xef, 0x82, 0x6f, 0x67, 0x16, 0xbe,
+	0x96, 0x04, 0x19, 0xf0, 0xae, 0xfb, 0x49, 0xe3, 0xa8, 0x0f, 0x6b, 0x23, 0xfc, 0xd4, 0x77, 0xec,
+	0xa0, 0xf6, 0x0f, 0x41, 0xfe, 0xea, 0x2c, 0xe4, 0xfb, 0x0c, 0x61, 0xc0, 0x8c, 0xfc, 0xb1, 0x41,
+	0x74, 0x1f, 0x96, 0x3b, 0xb6, 0x6b, 0x3a, 0xf6, 0xdb, 0x38, 0x34, 0xef, 0x3f, 0x05, 0xe3, 0x4b,
+	0x29, 0x19, 0xaf, 0x49, 0xf1, 0x51, 0xfb, 0x2e, 0x75, 0x86, 0x06, 0xd0, 0xcf, 0x15, 0x38, 0x37,
+	0xc1, 0xc2, 0x46, 0x0f, 0x07, 0xa6, 0x65, 0x06, 0x66, 0xed, 0x5f, 0x82, 0xfb, 0xe6, 0x3c, 0xa6,
+	0xbe, 0x25, 0xc1, 0x06, 0xaa, 0x3c, 0xe9, 0x3f, 0x6e, 0xde, 0x76, 0x09, 0x16, 0x88, 0x98, 0xab,
+	0x7d, 0x54, 0x80, 0xd5, 0xa1, 0x08, 0xa0, 0xbe, 0xe7, 0x52, 0x9c, 0x36, 0x04, 0xd6, 0x20, 0x8f,
+	0x09, 0xf1, 0x88, 0x0c, 0x2c, 0xd1, 0x40, 0xdf, 0x18, 0x0f, 0x8c, 0x17, 0xa6, 0x0e, 0x0c, 0xa1,
+	0xc8, 0x50, 0x64, 0x74, 0x26, 0x45, 0xc6, 0x4b, 0xb3, 0x45, 0x46, 0x44, 0x31, 0x12, 0x1a, 0xef,
+	0x7e, 0x6a, 0x68, 0xec, 0xce, 0x17, 0x1a, 0x11, 0xf1, 0x84, 0xd8, 0x38, 0x7a, 0x7c, 0x6c, 0x6c,
+	0xcd, 0x11, 0x1b, 0x11, 0x75, 0x52, 0x70, 0xd8, 0x13, 0x83, 0xe3, 0x6b, 0x33, 0x06, 0x47, 0x44,
+	0x37, 0x1a, 0x1d, 0x1f, 0x4e, 0x11, 0x1d, 0x7b, 0x27, 0x13, 0x1d, 0x91, 0x2e, 0x9f, 0x12, 0x1e,
+	0xc0, 0xdc, 0x57, 0x4c, 0xd6, 0x3e, 0x50, 0x60, 0x79, 0xe4, 0xb0, 0x46, 0x6f, 0xc3, 0xd9, 0x11,
+	0xcd, 0x87, 0xae, 0xb0, 0xec, 0x46, 0x79, 0xf3, 0xe5, 0x59, 0x34, 0x8e, 0xdd, 0x64, 0x67, 0xfc,
+	0xe4, 0x01, 0x0d, 0x41, 0x75, 0x34, 0x44, 0xb4, 0x8f, 0xca, 0x70, 0x66, 0x02, 0x10, 0x5a, 0x82,
+	0x4c, 0x14, 0xbb, 0x19, 0xdb, 0x42, 0x2e, 0x40, 0x40, 0x4c, 0x97, 0x76, 0x3c, 0xd2, 0xa3, 0xb5,
+	0x0c, 0x57, 0xf6, 0xf6, 0x7c, 0xca, 0xd6, 0x0f, 0x22, 0xc0, 0x86, 0x1b, 0x90, 0x63, 0x3d, 0xc6,
+	0x80, 0x02, 0xa8, 0xf8, 0x6d, 0xcf, 0x71, 0x30, 0x3f, 0x31, 0x68, 0x2d, 0xcb, 0x19, 0x5b, 0x73,
+	0x32, 0xb6, 0x62, 0x90, 0x82, 0x73, 0x88, 0x05, 0xfd, 0x44, 0x81, 0xb5, 0x87, 0xb6, 0x6b, 0x79,
+	0x0f, 0x6d, 0xb7, 0x6b, 0xd0, 0x80, 0x98, 0x01, 0xee, 0xda, 0x98, 0xd6, 0x72, 0x9c, 0xfe, 0xde,
+	0x9c, 0xf4, 0xf7, 0x42, 0xe8, 0xfd, 0x08, 0x59, 0x68, 0xb1, 0xfa, 0x70, 0x7c, 0x04, 0x1d, 0x42,
+	0x81, 0xe7, 0x1b, 0xb4, 0x96, 0xe7, 0xec, 0x37, 0xe6, 0x64, 0xdf, 0xe1, 0x60, 0x82, 0x50, 0x22,
+	0x33, 0x33, 0x63, 0xf7, 0xc8, 0x26, 0x9e, 0xdb, 0xc3, 0x6e, 0x40, 0x6b, 0x85, 0x13, 0x31, 0x73,
+	0x23, 0x06, 0x29, 0xcd, 0x1c, 0x67, 0x41, 0x8f, 0xe0, 0x09, 0x1a, 0x98, 0x01, 0x36, 0x26, 0xa4,
+	0x73, 0x0b, 0xf3, 0xa5, 0x73, 0x67, 0x39, 0x78, 0xd2, 0x10, 0x63, 0x66, 0x79, 0x2b, 0x99, 0xc4,
+	0x5c, 0x9c, 0x93, 0x99, 0x83, 0x27, 0x0d, 0xa9, 0x0e, 0x2c, 0x8f, 0xf8, 0x3b, 0xaa, 0x42, 0xf6,
+	0x01, 0x3e, 0x96, 0x41, 0xc6, 0x7e, 0xa2, 0x1d, 0xc8, 0x1f, 0x99, 0x4e, 0x1f, 0xf3, 0x6b, 0xb1,
+	0xbc, 0xf9, 0x6c, 0x0a, 0x3d, 0x5a, 0x11, 0xaa, 0x2e, 0x64, 0x5f, 0xcc, 0x5c, 0x51, 0x54, 0x0f,
+	0x56, 0xc6, 0x7c, 0x3d, 0x81, 0x6f, 0x77, 0x98, 0xaf, 0x9e, 0x86, 0x6f, 0x27, 0x82, 0x8d, 0x13,
+	0xbe, 0x03, 0xb5, 0x49, 0xde, 0x9d, 0xc0, 0x7b, 0x63, 0x98, 0xf7, 0x72, 0x0a, 0xde, 0x51, 0xf4,
+	0xe3, 0x38, 0x7b, 0x1b, 0xca, 0x31, 0xef, 0x4e, 0x20, 0x7c, 0x79, 0x98, 0x70, 0x23, 0x05, 0x21,
+	0x07, 0x1c, 0xb1, 0xe9, 0x98, 0x63, 0x9f, 0x8c, 0x4d, 0x63, 0xb0, 0x31, 0x42, 0xed, 0x57, 0x59,
+	0x58, 0x11, 0xb1, 0xb5, 0xe5, 0xfb, 0x8e, 0xdd, 0xe6, 0xd5, 0x14, 0x7a, 0x1a, 0x2a, 0xd1, 0x39,
+	0x39, 0xc8, 0xaf, 0xca, 0x51, 0x5f, 0xd3, 0x62, 0x95, 0x8b, 0xed, 0xfa, 0xfd, 0x20, 0x56, 0xb9,
+	0xf0, 0x76, 0xd3, 0x42, 0x35, 0x58, 0xc0, 0x0e, 0x66, 0x4c, 0xb5, 0xec, 0x39, 0x65, 0xa3, 0xa2,
+	0x87, 0x4d, 0xf4, 0x1d, 0x58, 0xf1, 0xfa, 0x01, 0x93, 0x7a, 0x68, 0x06, 0x98, 0xf4, 0x4c, 0xf2,
+	0x20, 0x3c, 0xf7, 0xd2, 0x1e, 0xf4, 0x63, 0xca, 0xd6, 0xef, 0x70, 0xc4, 0x7b, 0x11, 0xa0, 0x38,
+	0x0d, 0xaa, 0xde, 0x48, 0x37, 0x6a, 0x01, 0xd8, 0xd4, 0x38, 0xf4, 0xfa, 0xae, 0x85, 0xad, 0x5a,
+	0xfe, 0x9c, 0xb2, 0xb1, 0xb4, 0x79, 0x29, 0x85, 0xe5, 0x9a, 0x74, 0x5b, 0xc8, 0xd4, 0x1b, 0x6e,
+	0xbf, 0xa7, 0x97, 0xec, 0xb0, 0xad, 0x1a, 0xb0, 0x9e, 0x48, 0x9e, 0xb0, 0x63, 0x17, 0x87, 0x77,
+	0x4c, 0xad, 0x8b, 0x4a, 0xb5, 0x1e, 0x56, 0xaa, 0xf5, 0x83, 0xb0, 0xd4, 0x8d, 0xef, 0xce, 0x1f,
+	0x15, 0xa8, 0xed, 0x62, 0xc7, 0x3c, 0xc6, 0xd6, 0xf8, 0x26, 0xbd, 0x01, 0x65, 0x73, 0xd0, 0x94,
+	0xf5, 0xe9, 0x95, 0x59, 0xcd, 0xa8, 0xc7, 0xc1, 0xd0, 0x4d, 0x58, 0x93, 0x59, 0x38, 0xb6, 0x0c,
+	0x76, 0xe0, 0x18, 0x16, 0x53, 0x43, 0x6a, 0x7f, 0x76, 0x4c, 0xfb, 0x5d, 0x59, 0x87, 0xeb, 0x28,
+	0x12, 0x63, 0x0b, 0xe2, 0xba, 0x6b, 0xbf, 0xcf, 0xc1, 0x5a, 0x52, 0xdd, 0x87, 0x5e, 0x81, 0x27,
+	0x26, 0x26, 0x2b, 0x03, 0xb7, 0x3b, 0x3b, 0x21, 0xdf, 0x68, 0x5a, 0xc8, 0x86, 0x4a, 0x9b, 0xad,
+	0xd4, 0x08, 0xbc, 0x07, 0xd8, 0x0d, 0x73, 0x86, 0x6b, 0x73, 0xd4, 0xa2, 0xf5, 0x1d, 0x26, 0x75,
+	0xc0, 0xe0, 0xf4, 0x72, 0x3b, 0xfa, 0x4d, 0xd5, 0x7f, 0x67, 0x00, 0x06, 0x63, 0xe8, 0x2d, 0x80,
+	0x3e, 0xc5, 0xc4, 0xe0, 0xd7, 0x80, 0xb4, 0x7d, 0xeb, 0x64, 0x78, 0xeb, 0x77, 0x29, 0x26, 0xfb,
+	0x0c, 0xf7, 0xb5, 0x53, 0x7a, 0xa9, 0x1f, 0x36, 0x18, 0x25, 0xb5, 0x2d, 0x6c, 0xf0, 0x30, 0x93,
+	0x3b, 0x71, 0x52, 0x94, 0xfb, 0xb6, 0x85, 0x9b, 0x0c, 0x97, 0x51, 0xd2, 0xb0, 0xc1, 0x4a, 0x28,
+	0x6e, 0xd9, 0x1a, 0xf0, 0x38, 0x16, 0x0d, 0xb5, 0x0c, 0xa5, 0x48, 0x45, 0x55, 0x87, 0x52, 0x24,
+	0x9c, 0xe6, 0xdc, 0xd0, 0x60, 0x71, 0xb0, 0x8a, 0xc1, 0xe1, 0x51, 0x8e, 0x48, 0x9b, 0xd6, 0x76,
+	0x01, 0x72, 0xc1, 0xb1, 0x8f, 0xb5, 0x4f, 0xb2, 0xb0, 0x9e, 0x58, 0x16, 0xb1, 0x6a, 0x8b, 0x60,
+	0x6a, 0x5b, 0x7d, 0xd3, 0x31, 0x88, 0xe7, 0x05, 0xe1, 0xd6, 0xbf, 0x92, 0xd2, 0x1e, 0x93, 0x82,
+	0x4a, 0x5f, 0x0c, 0x61, 0x75, 0x86, 0x8a, 0xbe, 0x05, 0xd5, 0x9e, 0xe7, 0xda, 0x81, 0x47, 0x58,
+	0xb2, 0x66, 0xbb, 0x1d, 0x2f, 0x4c, 0x13, 0xd3, 0x9c, 0x1c, 0xb7, 0x22, 0xd1, 0xa6, 0xdb, 0xf1,
+	0xf4, 0xe5, 0xde, 0x50, 0x9b, 0xa2, 0xe7, 0x60, 0x9d, 0x85, 0x8b, 0x4d, 0x30, 0x35, 0x64, 0x09,
+	0x22, 0x62, 0x39, 0x77, 0x4e, 0xd9, 0x28, 0xea, 0x6b, 0xe1, 0xe0, 0xb5, 0xd8, 0x18, 0x3a, 0x86,
+	0x18, 0x8e, 0xc1, 0x4b, 0x91, 0xfc, 0xec, 0x19, 0x55, 0x68, 0xd1, 0x98, 0xae, 0xbb, 0x66, 0x60,
+	0x8a, 0x33, 0x74, 0xa9, 0x37, 0xd4, 0xa9, 0x6e, 0xc1, 0x6a, 0xc2, 0xb4, 0x84, 0xd3, 0x6e, 0x2d,
+	0x7e, 0xda, 0x55, 0x62, 0x27, 0xda, 0x8d, 0x5c, 0x51, 0xa9, 0x66, 0xb4, 0x06, 0x3c, 0xf1, 0xb8,
+	0x87, 0x99, 0x94, 0x15, 0xbe, 0x76, 0x00, 0xcf, 0xa4, 0x79, 0x74, 0x40, 0x5f, 0x06, 0x34, 0xb2,
+	0x8b, 0x02, 0x32, 0xbb, 0x51, 0xd2, 0xab, 0xc3, 0x9b, 0xd2, 0xb4, 0xb4, 0x4f, 0x32, 0xf0, 0xe4,
+	0x63, 0x6b, 0xe3, 0xcf, 0xd8, 0x2b, 0xde, 0x53, 0x26, 0xed, 0xf0, 0xeb, 0x27, 0x51, 0xd9, 0x7f,
+	0xae, 0x3b, 0x7d, 0x23, 0x57, 0xcc, 0x54, 0xb3, 0x37, 0x72, 0xc5, 0x5c, 0x35, 0xaf, 0x7d, 0x9c,
+	0x81, 0xf3, 0xa9, 0xca, 0x60, 0xf4, 0xfe, 0xb0, 0x09, 0x98, 0x85, 0x65, 0xf1, 0xfa, 0xed, 0x93,
+	0x2c, 0xb7, 0x47, 0xb6, 0x62, 0xcc, 0x14, 0xac, 0x53, 0x0d, 0xe2, 0xa6, 0x88, 0xa6, 0x25, 0x98,
+	0xe2, 0xfa, 0xf0, 0x15, 0x3f, 0x83, 0x2b, 0xc4, 0x6e, 0xfe, 0xf7, 0x72, 0x70, 0x76, 0xe2, 0xf3,
+	0x61, 0xda, 0x17, 0xb0, 0xb7, 0x61, 0xc9, 0xc2, 0xd4, 0x26, 0xd8, 0x12, 0x6f, 0x34, 0xa1, 0x97,
+	0xee, 0xcf, 0xfb, 0x7e, 0x59, 0xdf, 0x15, 0xb0, 0xbc, 0x4f, 0x26, 0x5c, 0x8b, 0x56, 0xbc, 0x4f,
+	0xfd, 0x9d, 0x02, 0x95, 0xf8, 0x2c, 0xb4, 0x09, 0xeb, 0x1d, 0x62, 0x0a, 0x85, 0xbd, 0x8e, 0x41,
+	0x70, 0xcf, 0xb4, 0x5d, 0x0b, 0x8b, 0x87, 0x75, 0x45, 0x5f, 0x0d, 0x07, 0xef, 0x74, 0xf4, 0x70,
+	0x08, 0x5d, 0x84, 0x35, 0xd3, 0x71, 0xbc, 0x87, 0xe1, 0x02, 0x0c, 0xf1, 0x41, 0x81, 0x2f, 0x23,
+	0xab, 0x23, 0x39, 0xc6, 0xf1, 0x5b, 0x7c, 0x04, 0x5d, 0x81, 0x1a, 0xa6, 0x81, 0xdd, 0x33, 0x59,
+	0xe2, 0x22, 0xee, 0x18, 0x99, 0x7c, 0x52, 0xbe, 0x2f, 0x59, 0xfd, 0x74, 0x34, 0xce, 0xaf, 0x9b,
+	0x86, 0x1c, 0x55, 0x7f, 0xaa, 0x00, 0x1a, 0x5f, 0x56, 0xc2, 0x3e, 0xb7, 0x87, 0xf7, 0xf9, 0xd6,
+	0x89, 0x1a, 0x33, 0xee, 0x03, 0xff, 0xc9, 0x82, 0x3a, 0xf9, 0x99, 0x0c, 0xbd, 0x09, 0x8b, 0x3e,
+	0xb1, 0x7b, 0x26, 0x39, 0x96, 0x57, 0xa0, 0x88, 0x90, 0xd9, 0x33, 0xc0, 0x8a, 0x84, 0x13, 0x57,
+	0xdf, 0xe7, 0x75, 0xc5, 0xf6, 0x61, 0xa9, 0x7d, 0xdf, 0x74, 0x5d, 0xec, 0x0c, 0x3b, 0xe9, 0xed,
+	0xb9, 0x1f, 0x12, 0xeb, 0x3b, 0x02, 0x57, 0x74, 0x2e, 0xb6, 0x63, 0x2d, 0xaa, 0xfe, 0x42, 0x81,
+	0x4a, 0x7c, 0x3c, 0x4d, 0xee, 0x72, 0x11, 0xd6, 0x1c, 0x93, 0x06, 0x46, 0x68, 0xf6, 0xb0, 0xca,
+	0x11, 0x8e, 0x85, 0xd8, 0x58, 0x4b, 0x0c, 0x49, 0xaf, 0x42, 0x97, 0xe1, 0x74, 0xc7, 0x26, 0x34,
+	0x30, 0x22, 0x53, 0xc6, 0x2b, 0xa3, 0xac, 0xbe, 0xc6, 0x47, 0x75, 0x39, 0x28, 0xa5, 0xb4, 0x97,
+	0x61, 0x3d, 0xf1, 0x21, 0x3f, 0xed, 0xbd, 0x58, 0x83, 0xd3, 0xc9, 0x6f, 0x9d, 0xda, 0x9f, 0xb3,
+	0x50, 0x0c, 0x3d, 0x1e, 0xbd, 0x06, 0x39, 0x7e, 0xb9, 0x08, 0xbf, 0xb9, 0x9c, 0xd2, 0xde, 0xa1,
+	0x78, 0x9d, 0x5d, 0x0b, 0x3a, 0x47, 0x40, 0x37, 0x21, 0xcf, 0x5f, 0x25, 0xa4, 0x8b, 0x7c, 0x65,
+	0x5a, 0x28, 0x56, 0x2b, 0x10, 0x5d, 0x60, 0xa8, 0xdf, 0x85, 0x1c, 0x83, 0x4e, 0x7b, 0xc8, 0x8d,
+	0xee, 0x5b, 0x66, 0x7c, 0xdf, 0x90, 0x5c, 0xa8, 0xa8, 0x46, 0x85, 0xca, 0x67, 0x60, 0xc1, 0xa6,
+	0x06, 0xdb, 0x32, 0x99, 0x6d, 0x15, 0x6c, 0xba, 0x67, 0xd2, 0x40, 0xfd, 0xad, 0x02, 0x79, 0xae,
+	0xcf, 0x09, 0x2a, 0xf0, 0x05, 0x58, 0x16, 0x4f, 0x42, 0x1d, 0xb3, 0x67, 0x3b, 0xc7, 0x6c, 0x56,
+	0x56, 0x40, 0xf1, 0xee, 0x6b, 0xbc, 0xb7, 0x69, 0xa1, 0xd3, 0x50, 0xe0, 0x1d, 0x94, 0xeb, 0x54,
+	0xd1, 0x65, 0x2b, 0xae, 0x6c, 0x3e, 0xae, 0xac, 0xf6, 0xb3, 0x2c, 0x54, 0x78, 0x1e, 0x1e, 0x7a,
+	0xc8, 0xe8, 0x9b, 0xea, 0xf8, 0x1a, 0x32, 0x49, 0x6b, 0xd8, 0x83, 0x92, 0x78, 0x2d, 0x63, 0x67,
+	0x5d, 0x96, 0x9f, 0x6b, 0x17, 0x52, 0x6e, 0x22, 0xa7, 0xbf, 0x89, 0x8f, 0xf5, 0x22, 0x95, 0xbf,
+	0xd0, 0x4d, 0xc8, 0x76, 0x71, 0x30, 0xed, 0x77, 0x47, 0x0e, 0x74, 0x1d, 0xc7, 0xbe, 0x91, 0x31,
+	0x14, 0x74, 0x00, 0x05, 0xd3, 0xf7, 0xb1, 0x6b, 0x85, 0x1f, 0x54, 0xae, 0x4e, 0x83, 0xb7, 0xc5,
+	0x45, 0x07, 0x90, 0x12, 0x0b, 0x7d, 0x1d, 0xf2, 0x6d, 0x07, 0x9b, 0x24, 0xfc, 0x68, 0x72, 0x65,
+	0x1a, 0xd0, 0x1d, 0x26, 0x39, 0xc0, 0x14, 0x48, 0xf1, 0x2f, 0x57, 0x7f, 0xc8, 0xc0, 0xa2, 0xdc,
+	0x16, 0x79, 0x58, 0x8f, 0xee, 0x4b, 0xf2, 0xc7, 0xa9, 0xbd, 0x21, 0xc3, 0xbd, 0x30, 0xb5, 0xe1,
+	0xa2, 0xaf, 0x08, 0xdc, 0x72, 0x77, 0x47, 0x2d, 0xf7, 0xe2, 0x2c, 0x96, 0x8b, 0x30, 0x43, 0xd3,
+	0xe9, 0x23, 0xa6, 0xbb, 0x3a, 0x83, 0xe9, 0x22, 0x50, 0x69, 0xbb, 0xf8, 0x67, 0x8d, 0x3f, 0x15,
+	0xa1, 0x18, 0x3a, 0x15, 0x6a, 0x41, 0x41, 0xfc, 0xb3, 0x40, 0xd6, 0xd8, 0xcf, 0x4f, 0xe9, 0x95,
+	0x75, 0x9d, 0x4b, 0x33, 0xf5, 0x05, 0x0e, 0xa2, 0xb0, 0xda, 0xeb, 0x3b, 0x2c, 0x05, 0xf0, 0x8d,
+	0xb1, 0x7a, 0x7a, 0x6b, 0x5a, 0xf8, 0x5b, 0x12, 0x2a, 0x5e, 0x40, 0xaf, 0xf4, 0x46, 0x3b, 0x91,
+	0x05, 0x4b, 0x87, 0x66, 0xd7, 0x88, 0x3d, 0x19, 0x64, 0xa7, 0xfa, 0x38, 0x18, 0xf1, 0x6d, 0x9b,
+	0xdd, 0xf8, 0xf3, 0x40, 0xe5, 0x30, 0xd6, 0x66, 0x4b, 0xb3, 0x03, 0x4c, 0xcc, 0x43, 0x07, 0xc7,
+	0x97, 0x96, 0x9b, 0x6d, 0x69, 0x4d, 0x09, 0x35, 0xb4, 0x34, 0x7b, 0xb4, 0x13, 0x7d, 0x5f, 0x81,
+	0x5a, 0x64, 0xd0, 0x07, 0xf8, 0x98, 0xc6, 0xa9, 0xf3, 0x9c, 0xba, 0x31, 0xab, 0x55, 0x6f, 0xe2,
+	0x63, 0x1a, 0xa7, 0x5f, 0xef, 0x25, 0x0d, 0xa8, 0x2a, 0x14, 0xc4, 0x36, 0xc7, 0xb3, 0xb5, 0x0a,
+	0xcf, 0xd6, 0x54, 0x02, 0x2b, 0x63, 0x0b, 0x39, 0xa1, 0x77, 0x0a, 0x76, 0x5c, 0x8b, 0x8f, 0x2a,
+	0xf2, 0x66, 0x91, 0x2d, 0xf5, 0x87, 0x0a, 0xac, 0x8c, 0x39, 0xc6, 0x67, 0x4c, 0x1a, 0x2e, 0x3d,
+	0x37, 0x58, 0xfa, 0x11, 0xac, 0x27, 0x1a, 0xf2, 0xb3, 0x5e, 0xfe, 0xbb, 0x50, 0x89, 0xbb, 0x69,
+	0x4a, 0xba, 0x41, 0x6c, 0xc4, 0xe8, 0xa2, 0xd7, 0xaf, 0x69, 0x16, 0x1e, 0xbd, 0x1f, 0xbd, 0x0a,
+	0xcb, 0x23, 0x97, 0x0a, 0x7a, 0x16, 0x50, 0xdb, 0x73, 0x03, 0xdb, 0xed, 0xf3, 0x64, 0x54, 0x3c,
+	0x1c, 0x4a, 0x7f, 0x59, 0x89, 0x8f, 0xf0, 0x77, 0x31, 0xed, 0x2e, 0x54, 0x47, 0x4f, 0xd7, 0x29,
+	0x21, 0xa2, 0xe4, 0x23, 0x33, 0x48, 0x3e, 0xb4, 0x0d, 0x40, 0xe3, 0xb7, 0x53, 0x34, 0x53, 0x89,
+	0xcd, 0x5c, 0x87, 0xd5, 0x84, 0xd3, 0x58, 0x5b, 0x85, 0x95, 0xb1, 0x9b, 0x48, 0x5b, 0x93, 0xa8,
+	0x43, 0x67, 0xac, 0xf6, 0xeb, 0x1c, 0x14, 0xf7, 0xbc, 0xae, 0xa8, 0x66, 0xbe, 0x09, 0x45, 0x8a,
+	0x8f, 0x30, 0xb1, 0x03, 0x11, 0x24, 0x4b, 0xa9, 0x3f, 0x9e, 0x87, 0x10, 0xf5, 0x7d, 0x29, 0x2f,
+	0x5e, 0xc3, 0x23, 0x38, 0x74, 0x05, 0x4a, 0xd1, 0xdf, 0xb5, 0x52, 0xbc, 0x72, 0x0f, 0x26, 0xa3,
+	0x1a, 0x2c, 0xf4, 0x30, 0xa5, 0x66, 0x17, 0xcb, 0xac, 0x28, 0x6c, 0xf2, 0xf7, 0x47, 0x62, 0xb6,
+	0x31, 0xdf, 0xdc, 0x92, 0x2e, 0x1a, 0x09, 0x39, 0x4d, 0x3e, 0x4d, 0x5e, 0x56, 0x18, 0x77, 0xbb,
+	0xa7, 0xa1, 0xe2, 0x78, 0x5d, 0xc3, 0xf1, 0xe4, 0x1b, 0xfa, 0x82, 0x98, 0xe2, 0x78, 0xdd, 0x3d,
+	0xd9, 0xc5, 0x53, 0xb2, 0xfb, 0x04, 0x9b, 0x16, 0xff, 0x6e, 0x57, 0xd2, 0x65, 0x4b, 0x7d, 0x1d,
+	0x72, 0x7b, 0x36, 0x0d, 0x50, 0x0b, 0xd8, 0x74, 0x03, 0xbb, 0x01, 0xb1, 0x71, 0x58, 0x83, 0x5d,
+	0x98, 0xd2, 0xa8, 0x3a, 0x38, 0xe2, 0x97, 0x8d, 0xa9, 0x4a, 0xa0, 0x18, 0xda, 0x58, 0xeb, 0x40,
+	0x8e, 0x99, 0x19, 0x2d, 0x43, 0xf9, 0xee, 0xed, 0xfd, 0x56, 0x63, 0xa7, 0x79, 0xad, 0xd9, 0xd8,
+	0xad, 0x9e, 0x42, 0x25, 0xc8, 0x1f, 0xe8, 0x5b, 0x3b, 0x8d, 0xaa, 0xc2, 0x7e, 0xee, 0x36, 0xb6,
+	0xef, 0x5e, 0xaf, 0x66, 0x50, 0x11, 0x72, 0xcd, 0xdb, 0xd7, 0xee, 0x54, 0xb3, 0x08, 0xa0, 0x70,
+	0xfb, 0xce, 0x41, 0x73, 0xa7, 0x51, 0xcd, 0xb1, 0xde, 0x7b, 0x5b, 0xfa, 0xed, 0x6a, 0x9e, 0x4d,
+	0x6d, 0xe8, 0xfa, 0x1d, 0xbd, 0x5a, 0x40, 0x15, 0x28, 0xee, 0xe8, 0xcd, 0x83, 0xe6, 0xce, 0xd6,
+	0x5e, 0x75, 0x41, 0xab, 0x00, 0xec, 0x79, 0xdd, 0x1d, 0xcf, 0x0d, 0x88, 0xe7, 0x68, 0x7f, 0xc9,
+	0x71, 0x4f, 0x22, 0xc1, 0x3d, 0x8f, 0x3c, 0x18, 0xfc, 0xb7, 0xe0, 0xff, 0xa0, 0xf4, 0x90, 0x77,
+	0x0c, 0x82, 0xb8, 0x28, 0x3a, 0x9a, 0x16, 0x3a, 0x84, 0x6a, 0x5b, 0x88, 0x1b, 0xe1, 0x1f, 0xff,
+	0xa4, 0x17, 0xcc, 0xfc, 0xa5, 0x73, 0x59, 0x02, 0x36, 0x24, 0x1e, 0xe3, 0x70, 0xbc, 0x6e, 0xd7,
+	0x76, 0xbb, 0x03, 0x8e, 0xec, 0x9c, 0x1c, 0x12, 0x30, 0xe2, 0xb0, 0x60, 0xc5, 0x24, 0x81, 0xdd,
+	0x31, 0xdb, 0xc1, 0x80, 0x24, 0x37, 0x1f, 0x49, 0x35, 0x44, 0x8c, 0x58, 0x3a, 0x80, 0x7c, 0xe2,
+	0x1d, 0xd9, 0x94, 0x39, 0x70, 0x44, 0x93, 0x9f, 0x8f, 0x66, 0x25, 0x82, 0x8c, 0x78, 0xde, 0x84,
+	0x82, 0x6f, 0x12, 0xb3, 0x47, 0x6b, 0xc0, 0x1d, 0x73, 0x8a, 0x9b, 0x78, 0x64, 0xf7, 0xeb, 0x2d,
+	0x8e, 0x23, 0x3f, 0xed, 0x0b, 0x50, 0xf5, 0x2a, 0x94, 0x63, 0xdd, 0x9f, 0xf6, 0x3c, 0x58, 0x8a,
+	0x3f, 0x6e, 0x7c, 0x89, 0x1f, 0x6c, 0x03, 0x12, 0x79, 0xb8, 0x46, 0x79, 0xb1, 0x12, 0xcb, 0x8b,
+	0xb5, 0x8b, 0xec, 0xb8, 0xf3, 0xfc, 0xf4, 0xee, 0xa8, 0x7d, 0x91, 0x79, 0xf0, 0x40, 0xe2, 0xb1,
+	0xe8, 0xe7, 0x61, 0x55, 0xcc, 0x63, 0xa7, 0x67, 0x9f, 0x4e, 0x28, 0xa5, 0xb4, 0x37, 0x61, 0x6d,
+	0x78, 0xda, 0x54, 0xa9, 0xfd, 0x53, 0x50, 0xa6, 0x5c, 0x4e, 0xbc, 0x66, 0x8a, 0x83, 0x0e, 0x44,
+	0x57, 0xd3, 0xed, 0x78, 0x9b, 0x1f, 0x2a, 0xb0, 0xb8, 0x8d, 0xcd, 0xde, 0x35, 0x57, 0x86, 0x21,
+	0xfa, 0x91, 0x02, 0x0b, 0xe1, 0xef, 0xb4, 0xa9, 0x7b, 0xc2, 0xff, 0xe5, 0xd4, 0xab, 0xb3, 0xc8,
+	0x8a, 0x2b, 0xe5, 0xd4, 0x86, 0x72, 0x51, 0xd9, 0x7c, 0x07, 0x40, 0x68, 0xc6, 0xeb, 0x72, 0x57,
+	0xd6, 0xe7, 0x17, 0xa6, 0xac, 0xf2, 0xd5, 0x69, 0x05, 0x24, 0xfb, 0x8f, 0x15, 0x28, 0x0b, 0x7a,
+	0x91, 0x4e, 0x3c, 0x82, 0xbc, 0xf8, 0xf1, 0xdc, 0x34, 0x69, 0xa5, 0x5c, 0x91, 0x7a, 0x79, 0x3a,
+	0x21, 0x79, 0x89, 0x0a, 0x4d, 0xde, 0x8f, 0xb6, 0x68, 0x4f, 0x9c, 0x1a, 0xe8, 0x11, 0x2c, 0x84,
+	0x3f, 0x2f, 0x4f, 0x7b, 0x91, 0xb2, 0xeb, 0x43, 0xbd, 0x94, 0x5e, 0x2a, 0x3c, 0x9d, 0x85, 0x2e,
+	0x1f, 0x67, 0xa0, 0x26, 0x74, 0x69, 0x3c, 0x0a, 0x30, 0x71, 0x4d, 0x47, 0x38, 0x67, 0xcb, 0x13,
+	0x9e, 0x53, 0x8e, 0x45, 0x17, 0xba, 0x3a, 0x73, 0xd8, 0xab, 0x2f, 0xce, 0x22, 0x1a, 0x5a, 0x0d,
+	0xfd, 0x40, 0x01, 0x18, 0xc4, 0x21, 0x4a, 0x5f, 0x65, 0x8f, 0x04, 0xbb, 0x7a, 0x75, 0x06, 0xc9,
+	0x50, 0x8b, 0xcd, 0xdf, 0x28, 0x80, 0x84, 0xad, 0xe2, 0x01, 0x8c, 0x3e, 0x50, 0xa0, 0x32, 0xd4,
+	0x91, 0xf6, 0x4f, 0xcc, 0x49, 0xc7, 0x40, 0x6a, 0x43, 0x25, 0x1c, 0x35, 0x62, 0x4b, 0xb7, 0xb7,
+	0xe0, 0x99, 0x49, 0x10, 0x71, 0x84, 0xed, 0x92, 0x58, 0xcb, 0x96, 0x6f, 0xbf, 0xb1, 0x14, 0x1b,
+	0x32, 0x8e, 0x2e, 0x1d, 0x16, 0x78, 0xa6, 0xf5, 0xdc, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x86,
+	0xd0, 0xa6, 0x0b, 0xce, 0x2f, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index e74db8d..7027582 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -31,9 +31,6 @@ import time
 from builtins import object
 from typing import Optional
 
-from apache_beam.portability.api import beam_fn_api_pb2
-from apache_beam.utils import proto_utils
-
 try:
   import cython
 except ImportError:
@@ -113,11 +110,6 @@ class CounterCell(MetricCell):
     with self._lock:
       return self.value
 
-  def to_runner_api_user_metric(self, metric_name):
-    return beam_fn_api_pb2.Metrics.User(
-        metric_name=metric_name.to_runner_api(),
-        counter_data=beam_fn_api_pb2.Metrics.User.CounterData(value=self.value))
-
   def to_runner_api_monitoring_info(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
     return monitoring_infos.int64_user_counter(
@@ -176,11 +168,6 @@ class DistributionCell(MetricCell):
     with self._lock:
       return self.data.get_cumulative()
 
-  def to_runner_api_user_metric(self, metric_name):
-    return beam_fn_api_pb2.Metrics.User(
-        metric_name=metric_name.to_runner_api(),
-        distribution_data=self.get_cumulative().to_runner_api())
-
   def to_runner_api_monitoring_info(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
     return monitoring_infos.int64_user_distribution(
@@ -230,11 +217,6 @@ class GaugeCell(MetricCell):
     with self._lock:
       return self.data.get_cumulative()
 
-  def to_runner_api_user_metric(self, metric_name):
-    return beam_fn_api_pb2.Metrics.User(
-        metric_name=metric_name.to_runner_api(),
-        gauge_data=self.get_cumulative().to_runner_api())
-
   def to_runner_api_monitoring_info(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
     return monitoring_infos.int64_user_gauge(
@@ -372,17 +354,6 @@ class GaugeData(object):
     # type: (...) -> GaugeData
     return GaugeData(value, timestamp=timestamp)
 
-  def to_runner_api(self):
-    # type: () -> beam_fn_api_pb2.Metrics.User.GaugeData
-    return beam_fn_api_pb2.Metrics.User.GaugeData(
-        value=self.value, timestamp=proto_utils.to_Timestamp(self.timestamp))
-
-  @staticmethod
-  def from_runner_api(proto):
-    # type: (beam_fn_api_pb2.Metrics.User.GaugeData) -> GaugeData
-    return GaugeData(
-        proto.value, timestamp=proto_utils.from_Timestamp(proto.timestamp))
-
 
 class DistributionData(object):
   """For internal use only; no backwards-compatibility guarantees.
@@ -441,16 +412,6 @@ class DistributionData(object):
   def singleton(value):
     return DistributionData(value, 1, value, value)
 
-  def to_runner_api(self):
-    # type: () -> beam_fn_api_pb2.Metrics.User.DistributionData
-    return beam_fn_api_pb2.Metrics.User.DistributionData(
-        count=self.count, sum=self.sum, min=self.min, max=self.max)
-
-  @staticmethod
-  def from_runner_api(proto):
-    # type: (beam_fn_api_pb2.Metrics.User.DistributionData) -> DistributionData
-    return DistributionData(proto.sum, proto.count, proto.min, proto.max)
-
 
 class MetricAggregator(object):
   """For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py
index 14ba56d..e08872f 100644
--- a/sdks/python/apache_beam/metrics/metricbase.py
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -38,8 +38,6 @@ from __future__ import absolute_import
 
 from builtins import object
 
-from apache_beam.portability.api import beam_fn_api_pb2
-
 __all__ = ['Metric', 'Counter', 'Distribution', 'Gauge', 'MetricName']
 
 
@@ -77,15 +75,6 @@ class MetricName(object):
   def __hash__(self):
     return hash((self.namespace, self.name))
 
-  # TODO: this proto structure is deprecated
-  def to_runner_api(self):
-    return beam_fn_api_pb2.Metrics.User.MetricName(
-        namespace=self.namespace, name=self.name)
-
-  @staticmethod
-  def from_runner_api(proto):
-    return MetricName(proto.namespace, proto.name)
-
 
 class Metric(object):
   """Base interface of a metric object."""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index a4b6c68..6796333 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -318,7 +318,6 @@ class FnApiRunner(runner.PipelineRunner):
     """
     worker_handler_manager = WorkerHandlerManager(
         stage_context.components.environments, self._provision_info)
-    metrics_by_stage = {}
     monitoring_infos_by_stage = {}
 
     runner_execution_context = execution.FnApiRunnerExecutionContext(
@@ -336,13 +335,11 @@ class FnApiRunner(runner.PipelineRunner):
               runner_execution_context,
               bundle_context_manager,
           )
-          metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
           monitoring_infos_by_stage[stage.name] = (
               stage_results.process_bundle.monitoring_infos)
     finally:
       worker_handler_manager.close_all()
-    return RunnerResult(
-        runner.PipelineState.DONE, monitoring_infos_by_stage, metrics_by_stage)
+    return RunnerResult(runner.PipelineState.DONE, monitoring_infos_by_stage)
 
   def _store_side_inputs_in_state(self,
                                   runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
@@ -1120,10 +1117,9 @@ class FnApiMetrics(metric.MetricResults):
 
 
 class RunnerResult(runner.PipelineResult):
-  def __init__(self, state, monitoring_infos_by_stage, metrics_by_stage):
+  def __init__(self, state, monitoring_infos_by_stage):
     super(RunnerResult, self).__init__(state)
     self._monitoring_infos_by_stage = monitoring_infos_by_stage
-    self._metrics_by_stage = metrics_by_stage
     self._metrics = None
     self._monitoring_metrics = None
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 2067412..3bc977b 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1129,40 +1129,6 @@ class FnApiRunnerMetricsTest(unittest.TestCase):
       return False
 
     try:
-      # TODO(ajamato): Delete this block after deleting the legacy metrics code.
-      # Test the DEPRECATED legacy metrics
-      pregbk_metrics, postgbk_metrics = list(res._metrics_by_stage.values())
-      if 'Create/Map(decode)' not in pregbk_metrics.ptransforms:
-        # The metrics above are actually unordered. Swap.
-        pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
-      self.assertEqual(
-          4,
-          pregbk_metrics.ptransforms['Create/Map(decode)'].processed_elements.
-          measured.output_element_counts['None'])
-      self.assertEqual(
-          4,
-          pregbk_metrics.ptransforms['Map(sleep)'].processed_elements.measured.
-          output_element_counts['None'])
-      self.assertLessEqual(
-          4e-3 * DEFAULT_SAMPLING_PERIOD_MS,
-          pregbk_metrics.ptransforms['Map(sleep)'].processed_elements.measured.
-          total_time_spent)
-      self.assertEqual(
-          1,
-          postgbk_metrics.ptransforms['GroupByKey/Read'].processed_elements.
-          measured.output_element_counts['None'])
-
-      # The actual stage name ends up being something like 'm_out/lamdbda...'
-      m_out, = [
-          metrics for name, metrics in list(postgbk_metrics.ptransforms.items())
-          if name.startswith('m_out')]
-      self.assertEqual(
-          5, m_out.processed_elements.measured.output_element_counts['None'])
-      self.assertEqual(
-          1, m_out.processed_elements.measured.output_element_counts['once'])
-      self.assertEqual(
-          2, m_out.processed_elements.measured.output_element_counts['twice'])
-
       # Test the new MonitoringInfo monitoring format.
       self.assertEqual(2, len(res._monitoring_infos_by_stage))
       pregbk_mis, postgbk_mis = list(res._monitoring_infos_by_stage.values())
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index c86aa06..76426e3 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -262,16 +262,6 @@ class DataInputOperation(RunnerIOOperation):
         return self.stop - 1, None, None, self.stop
     return None
 
-  def progress_metrics(self):
-    # type: () -> beam_fn_api_pb2.Metrics.PTransform
-    with self.splitting_lock:
-      metrics = super(DataInputOperation, self).progress_metrics()
-      current_element_progress = self.receivers[0].current_element_progress()
-    if current_element_progress:
-      metrics.active_elements.fraction_remaining = (
-          current_element_progress.fraction_remaining)
-    return metrics
-
   def finish(self):
     # type: () -> None
     with self.splitting_lock:
@@ -972,42 +962,6 @@ class BundleProcessor(object):
         output_watermarks=output_watermarks,
         element=main_input_coder.get_impl().encode_nested(element))
 
-  def metrics(self):
-    # type: () -> beam_fn_api_pb2.Metrics
-    # DEPRECATED
-    return beam_fn_api_pb2.Metrics(
-        # TODO(robertwb): Rename to progress?
-        ptransforms={
-            transform_id: self._fix_output_tags(
-                transform_id, op.progress_metrics())
-            for transform_id,
-            op in self.ops.items()
-        })
-
-  def _fix_output_tags(self, transform_id, metrics):
-    # DEPRECATED
-    actual_output_tags = list(
-        self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
-
-    # Outputs are still referred to by index, not by name, in many Operations.
-    # However, if there is exactly one output, we can fix up the name here.
-
-    def fix_only_output_tag(actual_output_tag, mapping):
-      if len(mapping) == 1:
-        fake_output_tag, count = only_element(list(mapping.items()))
-        if fake_output_tag != actual_output_tag:
-          del mapping[fake_output_tag]
-          mapping[actual_output_tag] = count
-
-    if len(actual_output_tags) == 1:
-      fix_only_output_tag(
-          actual_output_tags[0],
-          metrics.processed_elements.measured.output_element_counts)
-      fix_only_output_tag(
-          actual_output_tags[0],
-          metrics.active_elements.measured.output_element_counts)
-    return metrics
-
   def monitoring_infos(self):
     # type: () -> List[metrics_pb2.MonitoringInfo]
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 941e7a3..d04221d 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -50,7 +50,6 @@ from apache_beam.io import iobase
 from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.metrics.execution import MetricsContainer
-from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import metrics_pb2
 from apache_beam.runners import common
 from apache_beam.runners.common import Receiver
@@ -338,26 +337,6 @@ class Operation(object):
     """Adds a receiver operation for the specified output."""
     self.consumers[output_index].append(operation)
 
-  def progress_metrics(self):
-    # type: () -> beam_fn_api_pb2.Metrics.PTransform
-    return beam_fn_api_pb2.Metrics.PTransform(
-        processed_elements=beam_fn_api_pb2.Metrics.PTransform.ProcessedElements(
-            measured=beam_fn_api_pb2.Metrics.PTransform.Measured(
-                total_time_spent=(
-                    self.scoped_start_state.sampled_seconds() +
-                    self.scoped_process_state.sampled_seconds() +
-                    self.scoped_finish_state.sampled_seconds()),
-                # Multi-output operations should override this.
-                output_element_counts=(
-                    # If there is exactly one output, we can unambiguously
-                    # fix its name later, which we do.
-                    # TODO(robertwb): Plumb the actual name here.
-                    {
-                        'ONLY_OUTPUT': self.receivers[0].opcounter.
-                        element_counter.value()
-                    } if len(self.receivers) == 1 else None))),
-        user=self.metrics_container.to_runner_api())
-
   def monitoring_infos(self, transform_id):
     # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
@@ -727,16 +706,6 @@ class DoOperation(Operation):
       self.user_state_context.reset()
     self.dofn_runner.bundle_finalizer_param.reset()
 
-  def progress_metrics(self):
-    # type: () -> beam_fn_api_pb2.Metrics.PTransform
-    metrics = super(DoOperation, self).progress_metrics()
-    if self.tagged_receivers:
-      metrics.processed_elements.measured.output_element_counts.clear()
-      for tag, receiver in self.tagged_receivers.items():
-        metrics.processed_elements.measured.output_element_counts[str(
-            tag)] = receiver.opcounter.element_counter.value()
-    return metrics
-
   def monitoring_infos(self, transform_id):
     # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
     infos = super(DoOperation, self).monitoring_infos(transform_id)
@@ -806,19 +775,6 @@ class SdfProcessSizedElements(DoOperation):
               self.element_start_output_bytes)
       return None
 
-  def progress_metrics(self):
-    # type: () -> beam_fn_api_pb2.Metrics.PTransform
-    with self.lock:
-      metrics = super(SdfProcessSizedElements, self).progress_metrics()
-      current_element_progress = self.current_element_progress()
-    if current_element_progress:
-      assert self.input_info is not None
-      metrics.active_elements.measured.input_element_counts[
-          self.input_info[1]] = 1
-      metrics.active_elements.fraction_remaining = (
-          current_element_progress.fraction_remaining)
-    return metrics
-
   def monitoring_infos(self, transform_id):
     # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d103a1b..db2209d 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -504,7 +504,6 @@ class SdkWorker(object):
               instruction_id=instruction_id,
               process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
                   residual_roots=delayed_applications,
-                  metrics=bundle_processor.metrics(),
                   monitoring_infos=monitoring_infos,
                   monitoring_data={
                       SHORT_ID_CACHE.getShortId(info): info.payload