You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/06 20:09:14 UTC

[GitHub] [beam] lukecwik opened a new pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

lukecwik opened a new pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325
 
 
   This completes the migration off of the Metrics proto to the MonitoringInfo proto.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#issuecomment-610075293
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404379886
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -253,18 +253,13 @@ message ProcessBundleRequest {
 }
 
 message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  // DEPRECATED
-  Metrics metrics = 1;
-
   // (Optional) Specifies that the bundle has not been completed and the
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
   repeated DelayedBundleApplication residual_roots = 2;
 
-  // (Required) The list of metrics or other MonitoredState
+  // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
 Review comment:
   Why are these also deprecated now?
   
   Will you be only using the bytes map now? I don't really get how this will work by looking at the ProcessBundle(Progress)Response protos. It seems like you would need the MonitoringInfo at least once to be able to decode the bytes. I don't really understand the protocol that you have in mind here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#issuecomment-610050030
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#issuecomment-610010553
 
 
   R: @Ardagan @lostluck 
   CC: @ajamato 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404412794
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -253,18 +253,13 @@ message ProcessBundleRequest {
 }
 
 message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  // DEPRECATED
-  Metrics metrics = 1;
-
   // (Optional) Specifies that the bundle has not been completed and the
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
   repeated DelayedBundleApplication residual_roots = 2;
 
-  // (Required) The list of metrics or other MonitoredState
+  // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
 Review comment:
   sgtm, Using ProcessBundleProgressMetadata should be called after the get the IDs, makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404384314
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##########
 @@ -341,30 +339,16 @@ void updateProgress() {
           grpcWriteOperation.abortWait();
         }
 
-        // TODO(BEAM-6189): Replace getProcessBundleProgress with getMonitoringInfos when Metrics
-        // is deprecated.
         ProcessBundleProgressResponse processBundleProgressResponse =
             MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
 
         final List<MonitoringInfo> monitoringInfosList =
             processBundleProgressResponse.getMonitoringInfosList();
 
-        // Supporting deprecated metrics until all supported runners are migrated to using
-        // MonitoringInfos
-        Metrics metrics = processBundleProgressResponse.getMetrics();
-        double elementsConsumed =
-            bundleProcessOperation.getInputElementsConsumed(monitoringInfosList);
-
-        if (elementsConsumed == 0) {
-          elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics);
-        }
-
         updateMetrics(monitoringInfosList);
-        updateMetricsDeprecated(metrics);
 
 Review comment:
   Are you sure this won't break existing integrations? I believe only the Python and Java SDK have moved everything to MonitoringInfos. The Go SDK has not yet

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#issuecomment-610045429
 
 
   LGTM overall.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404413544
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
 ##########
 @@ -242,200 +240,6 @@ public void close() {}
     operation.finish();
   }
 
-  @Test
-  public void testTentativeUserMetrics() throws Exception {
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request).build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    assertThat(metrics.getPtransformsOrThrow(stepName).getUserCount(), equalTo(1));
-
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(counterValue));
-
-    processBundleLatch.countDown();
-    operation.finish();
-  }
-
-  @Test
-  public void testFinalUserMetrics() throws Exception {
-    List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-    ExecutorService executorService = Executors.newCachedThreadPool();
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-    final long finalCounterValue = 77;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request)
-                          .setProcessBundle(
-                              BeamFnApi.ProcessBundleResponse.newBuilder()
-                                  .setMetrics(
-                                      BeamFnApi.Metrics.newBuilder()
-                                          .putPtransforms(
-                                              stepName,
-                                              BeamFnApi.Metrics.PTransform.newBuilder()
-                                                  .addUser(
-                                                      BeamFnApi.Metrics.User.newBuilder()
-                                                          .setMetricName(metricName)
-                                                          .setCounterData(
-                                                              BeamFnApi.Metrics.User.CounterData
-                                                                  .newBuilder()
-                                                                  .setValue(finalCounterValue)))
-                                                  .build())))
-                          .build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    // Force some intermediate metrics to test crosstalk is not introduced
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), not(equalTo(finalCounterValue)));
-
-    processBundleLatch.countDown();
-    operation.finish();
-
-    metrics = MoreFutures.get(operation.getFinalMetrics());
-
-    userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(finalCounterValue));
-  }
 
 Review comment:
   The legacy metrics proto deleted in this PR. You may want to check with to make sure no other runner is consuming this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404389323
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##########
 @@ -341,30 +339,16 @@ void updateProgress() {
           grpcWriteOperation.abortWait();
         }
 
-        // TODO(BEAM-6189): Replace getProcessBundleProgress with getMonitoringInfos when Metrics
-        // is deprecated.
         ProcessBundleProgressResponse processBundleProgressResponse =
             MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
 
         final List<MonitoringInfo> monitoringInfosList =
             processBundleProgressResponse.getMonitoringInfosList();
 
-        // Supporting deprecated metrics until all supported runners are migrated to using
-        // MonitoringInfos
-        Metrics metrics = processBundleProgressResponse.getMetrics();
-        double elementsConsumed =
-            bundleProcessOperation.getInputElementsConsumed(monitoringInfosList);
-
-        if (elementsConsumed == 0) {
-          elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics);
-        }
-
         updateMetrics(monitoringInfosList);
-        updateMetricsDeprecated(metrics);
 
 Review comment:
   Go SDK has, I and rebo worked on the changes. See: https://github.com/apache/beam/blob/aff446171b5ed77cdca6347ab6e3e597c096af91/sdks/go/pkg/beam/core/runtime/harness/monitoring.go#L204

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#issuecomment-610040706
 
 
   LGTM for Go
   Note, any errors in this won't be caught by the Go precommit, but given the Go postcommit is still grumpy there's no open verification of this at this time. (Granted this isn't automated in any of the Go SDK tests at this time, as the SDK has no user side metric extraction API just yet.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404388516
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -253,18 +253,13 @@ message ProcessBundleRequest {
 }
 
 message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  // DEPRECATED
-  Metrics metrics = 1;
-
   // (Optional) Specifies that the bundle has not been completed and the
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
   repeated DelayedBundleApplication residual_roots = 2;
 
-  // (Required) The list of metrics or other MonitoredState
+  // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
 Review comment:
   The plan is to go with short ids system as the one and only implementation and may not have enough time to remove these two fields before the 2.21 release.
   
   I want to have the option to un-deprecate them or remove them after the release.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404385343
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
 ##########
 @@ -242,200 +240,6 @@ public void close() {}
     operation.finish();
   }
 
-  @Test
-  public void testTentativeUserMetrics() throws Exception {
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request).build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    assertThat(metrics.getPtransformsOrThrow(stepName).getUserCount(), equalTo(1));
-
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(counterValue));
-
-    processBundleLatch.countDown();
-    operation.finish();
-  }
-
-  @Test
-  public void testFinalUserMetrics() throws Exception {
-    List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-    ExecutorService executorService = Executors.newCachedThreadPool();
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-    final long finalCounterValue = 77;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request)
-                          .setProcessBundle(
-                              BeamFnApi.ProcessBundleResponse.newBuilder()
-                                  .setMetrics(
-                                      BeamFnApi.Metrics.newBuilder()
-                                          .putPtransforms(
-                                              stepName,
-                                              BeamFnApi.Metrics.PTransform.newBuilder()
-                                                  .addUser(
-                                                      BeamFnApi.Metrics.User.newBuilder()
-                                                          .setMetricName(metricName)
-                                                          .setCounterData(
-                                                              BeamFnApi.Metrics.User.CounterData
-                                                                  .newBuilder()
-                                                                  .setValue(finalCounterValue)))
-                                                  .build())))
-                          .build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    // Force some intermediate metrics to test crosstalk is not introduced
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), not(equalTo(finalCounterValue)));
-
-    processBundleLatch.countDown();
-    operation.finish();
-
-    metrics = MoreFutures.get(operation.getFinalMetrics());
-
-    userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(finalCounterValue));
-  }
 
 Review comment:
   You way want to add this back, if you truly cannot yet delete the deprecated metrics 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#issuecomment-610040339
 
 
   CC: @HuangLED 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404389697
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
 ##########
 @@ -242,200 +240,6 @@ public void close() {}
     operation.finish();
   }
 
-  @Test
-  public void testTentativeUserMetrics() throws Exception {
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request).build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    assertThat(metrics.getPtransformsOrThrow(stepName).getUserCount(), equalTo(1));
-
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(counterValue));
-
-    processBundleLatch.countDown();
-    operation.finish();
-  }
-
-  @Test
-  public void testFinalUserMetrics() throws Exception {
-    List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
-    ExecutorService executorService = Executors.newCachedThreadPool();
-
-    CountDownLatch processBundleLatch = new CountDownLatch(1);
-
-    final String stepName = "fakeStepNameWithUserMetrics";
-    final String namespace = "sdk/whatever";
-    final String name = "someCounter";
-    final long counterValue = 42;
-    final long finalCounterValue = 77;
-
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
-
-    InstructionRequestHandler instructionRequestHandler =
-        new InstructionRequestHandler() {
-          @Override
-          public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
-            switch (request.getRequestCase()) {
-              case REGISTER:
-                return CompletableFuture.completedFuture(responseFor(request).build());
-              case PROCESS_BUNDLE:
-                return MoreFutures.supplyAsync(
-                    () -> {
-                      processBundleLatch.await();
-                      return responseFor(request)
-                          .setProcessBundle(
-                              BeamFnApi.ProcessBundleResponse.newBuilder()
-                                  .setMetrics(
-                                      BeamFnApi.Metrics.newBuilder()
-                                          .putPtransforms(
-                                              stepName,
-                                              BeamFnApi.Metrics.PTransform.newBuilder()
-                                                  .addUser(
-                                                      BeamFnApi.Metrics.User.newBuilder()
-                                                          .setMetricName(metricName)
-                                                          .setCounterData(
-                                                              BeamFnApi.Metrics.User.CounterData
-                                                                  .newBuilder()
-                                                                  .setValue(finalCounterValue)))
-                                                  .build())))
-                          .build();
-                    });
-              case PROCESS_BUNDLE_PROGRESS:
-                return CompletableFuture.completedFuture(
-                    responseFor(request)
-                        .setProcessBundleProgress(
-                            BeamFnApi.ProcessBundleProgressResponse.newBuilder()
-                                .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
-                                        .putPtransforms(
-                                            stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
-                                                .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
-                                                        .setMetricName(metricName)
-                                                        .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
-                                                                .setValue(counterValue)))
-                                                .build())))
-                        .build());
-              default:
-                // block forever
-                return new CompletableFuture<>();
-            }
-          }
-
-          @Override
-          public void close() {}
-        };
-
-    RegisterAndProcessBundleOperation operation =
-        new RegisterAndProcessBundleOperation(
-            idGenerator,
-            instructionRequestHandler,
-            mockBeamFnStateDelegator,
-            REGISTER_REQUEST,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableTable.of(),
-            ImmutableMap.of(),
-            mockContext);
-
-    operation.start();
-
-    // Force some intermediate metrics to test crosstalk is not introduced
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
-    BeamFnApi.Metrics.User userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), not(equalTo(finalCounterValue)));
-
-    processBundleLatch.countDown();
-    operation.finish();
-
-    metrics = MoreFutures.get(operation.getFinalMetrics());
-
-    userMetric = metrics.getPtransformsOrThrow(stepName).getUser(0);
-    assertThat(userMetric.getMetricName(), equalTo(metricName));
-    assertThat(userMetric.getCounterData().getValue(), equalTo(finalCounterValue));
-  }
 
 Review comment:
   Which deprecated metrics are you referring to?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404389323
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##########
 @@ -341,30 +339,16 @@ void updateProgress() {
           grpcWriteOperation.abortWait();
         }
 
-        // TODO(BEAM-6189): Replace getProcessBundleProgress with getMonitoringInfos when Metrics
-        // is deprecated.
         ProcessBundleProgressResponse processBundleProgressResponse =
             MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
 
         final List<MonitoringInfo> monitoringInfosList =
             processBundleProgressResponse.getMonitoringInfosList();
 
-        // Supporting deprecated metrics until all supported runners are migrated to using
-        // MonitoringInfos
-        Metrics metrics = processBundleProgressResponse.getMetrics();
-        double elementsConsumed =
-            bundleProcessOperation.getInputElementsConsumed(monitoringInfosList);
-
-        if (elementsConsumed == 0) {
-          elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics);
-        }
-
         updateMetrics(monitoringInfosList);
-        updateMetricsDeprecated(metrics);
 
 Review comment:
   Go SDK has, @lostluck and I worked on the changes. See: https://github.com/apache/beam/blob/aff446171b5ed77cdca6347ab6e3e597c096af91/sdks/go/pkg/beam/core/runtime/harness/monitoring.go#L204

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404379886
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -253,18 +253,13 @@ message ProcessBundleRequest {
 }
 
 message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  // DEPRECATED
-  Metrics metrics = 1;
-
   // (Optional) Specifies that the bundle has not been completed and the
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
   repeated DelayedBundleApplication residual_roots = 2;
 
-  // (Required) The list of metrics or other MonitoredState
+  // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
 Review comment:
   Why are these also deprecated now?
   
   Will you be only using the bytes map now? I don't really get how this will work by looking at the ProcessBundle(Progress)Response protos. It seems like you would need the MonitoringInfo at least once to have enough metadata be able to decode the bytes and associate multiple updates together. I don't really understand the protocol that you have in mind here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404412896
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##########
 @@ -341,30 +339,16 @@ void updateProgress() {
           grpcWriteOperation.abortWait();
         }
 
-        // TODO(BEAM-6189): Replace getProcessBundleProgress with getMonitoringInfos when Metrics
-        // is deprecated.
         ProcessBundleProgressResponse processBundleProgressResponse =
             MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
 
         final List<MonitoringInfo> monitoringInfosList =
             processBundleProgressResponse.getMonitoringInfosList();
 
-        // Supporting deprecated metrics until all supported runners are migrated to using
-        // MonitoringInfos
-        Metrics metrics = processBundleProgressResponse.getMetrics();
-        double elementsConsumed =
-            bundleProcessOperation.getInputElementsConsumed(monitoringInfosList);
-
-        if (elementsConsumed == 0) {
-          elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics);
-        }
-
         updateMetrics(monitoringInfosList);
-        updateMetricsDeprecated(metrics);
 
 Review comment:
   SGTM

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11325: [BEAM-4374, BEAM-6189] Delete and remove deprecated Metrics proto
URL: https://github.com/apache/beam/pull/11325#discussion_r404379886
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -253,18 +253,13 @@ message ProcessBundleRequest {
 }
 
 message ProcessBundleResponse {
-  // (Optional) If metrics reporting is supported by the SDK, this represents
-  // the final metrics to record for this bundle.
-  // DEPRECATED
-  Metrics metrics = 1;
-
   // (Optional) Specifies that the bundle has not been completed and the
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
   repeated DelayedBundleApplication residual_roots = 2;
 
-  // (Required) The list of metrics or other MonitoredState
+  // DEPRECATED (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
 Review comment:
   Why are these also deprecated now?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services