You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/24 04:36:08 UTC

[GitHub] [beam] kmjung opened a new pull request #12070: Add some metrics for BigQueryStorageStreamReader

kmjung opened a new pull request #12070:
URL: https://github.com/apache/beam/pull/12070


   Add basic latency metrics for BigQuery storage stream source
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r450478316



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws IOException {
         }
 
         fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse;
-        ReadRowsResponse currentResponse = responseIterator.next();
+        ReadRowsResponse currentResponse;
+        Stopwatch stopwatch = Stopwatch.createStarted();

Review comment:
       It's probably fine to not use an option, since it occurs infrequently. Though it may be a good idea to add an options as a precaution. Making it opt in. We can turn it on by default later. IMO.
   
   It may be a lot of effort to do this with the state sampler. So the approach you have is probably best to start with.
   
   Other approaches would be to run the job through some sort of profiler. Though I am not sure how to do that today with any of our runners. Possibly it could be setup with direct runner. I think that would help you to get plenty of useful info on the problem as well. May be worth asking on the dev list about that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #12070:
URL: https://github.com/apache/beam/pull/12070#issuecomment-714216714


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kmjung commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
kmjung commented on pull request #12070:
URL: https://github.com/apache/beam/pull/12070#issuecomment-648625962


   Please review, @chamikaramj and @ajamato.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12070:
URL: https://github.com/apache/beam/pull/12070#issuecomment-649157020


   Please add a JIRA


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r450433163



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws IOException {
         }
 
         fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse;
-        ReadRowsResponse currentResponse = responseIterator.next();
+        ReadRowsResponse currentResponse;
+        Stopwatch stopwatch = Stopwatch.createStarted();

Review comment:
       +1 to adding an options to allow disabling this.
   
   In the past we have avoided using tools which grab the system clock frequently as this system call can be quite slow and add performance impact. Especially if its done frequently. In this case however, it looks like the system clock may only be called infrequently. I.e. whenever an RPC is made (doing it or every element would be an issue) In which case its fine to do so.
   
   Instead we have used the state sampler classes to do this. This uses a separate thread to count which periodically wakes up and checks what "state" other threads are executing under. Which would . 
   
   However, there is no user metric or sdk level API to time blocks of code using the state sampler.
   
   We only used it for the execution time metrics. To calculate the time spend in the process, startBundle and finishBundle methods of each ParDos. Here it is for reference.
   https://github.com/apache/beam/pull/7676




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r450478316



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws IOException {
         }
 
         fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse;
-        ReadRowsResponse currentResponse = responseIterator.next();
+        ReadRowsResponse currentResponse;
+        Stopwatch stopwatch = Stopwatch.createStarted();

Review comment:
       It's probably fine to not use an option, since it occurs infrequently. Though it may be a good idea to add an options as a precaution. Making it opt in. We can turn it on by default later.
   
   It may be a lot of effort to do this with the state sampler. So the approach you have is probably best to start with.
   
   Other approaches would be to run the job through some sort of profiler. Though I am not sure how to do that today with any of our runners. Possibly it could be setup with direct runner. I think that would help you to get plenty of useful info on the problem as well. May be worth asking on the dev list about that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] emkornfield commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r444641710



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -153,6 +156,17 @@ public String toString() {
   @Experimental(Kind.SOURCE_SINK)
   public static class BigQueryStorageStreamReader<T> extends BoundedSource.BoundedReader<T> {
 
+    private final Counter streamingResponseCount =
+        Metrics.counter(
+            BigQueryStorageStreamReader.class, "bq_storage_read_streaming_response_count");
+    private final Counter streamingResponseTotalLatencyMs =
+        Metrics.counter(

Review comment:
       maybe [distribution](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/metrics/Distribution.html) for time base metrics?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kmjung commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
kmjung commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r450454375



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws IOException {
         }
 
         fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse;
-        ReadRowsResponse currentResponse = responseIterator.next();
+        ReadRowsResponse currentResponse;
+        Stopwatch stopwatch = Stopwatch.createStarted();

Review comment:
       I can certainly add options to disable the collection of these metrics if we establish that this is the right thing to do.
   
   The ReadRows API is a server streaming API which, on the server side, tries to send ReadRowsResponse messages as fast as the client can accept them. Each response message contains up to 100 MiB of encoded data -- in the extreme case, this is one row per response, but in the common case this is a batch of rows, usually 1024. The timer is read once -- or, I guess, twice -- per response message.
   
   What I'd like to establish with these metrics is whether the existing client-side read-ahead is sufficient in the common case to prevent (a large amount of) blocking while reading from a stream, or whether we're spending time waiting for I/O on this thread in addition to doing Avro decoding, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] closed pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
stale[bot] closed pull request #12070:
URL: https://github.com/apache/beam/pull/12070


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r445255937



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws IOException {
         }
 
         fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse;
-        ReadRowsResponse currentResponse = responseIterator.next();
+        ReadRowsResponse currentResponse;
+        Stopwatch stopwatch = Stopwatch.createStarted();

Review comment:
       Do you think these operations can have a performance impact ? If so will it make sense to add an option to disable/enable these ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #12070:
URL: https://github.com/apache/beam/pull/12070#issuecomment-676723846


   R: @ihji 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #12070:
URL: https://github.com/apache/beam/pull/12070#issuecomment-723500418


   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kmjung commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
kmjung commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r444676334



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -153,6 +156,17 @@ public String toString() {
   @Experimental(Kind.SOURCE_SINK)
   public static class BigQueryStorageStreamReader<T> extends BoundedSource.BoundedReader<T> {
 
+    private final Counter streamingResponseCount =
+        Metrics.counter(
+            BigQueryStorageStreamReader.class, "bq_storage_read_streaming_response_count");
+    private final Counter streamingResponseTotalLatencyMs =
+        Metrics.counter(

Review comment:
       Done.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -247,8 +267,19 @@ private synchronized boolean readNextRecord() throws IOException {
             fractionConsumedFromPreviousResponse);
       }
 
-      record = datumReader.read(record, decoder);
-      current = parseFn.apply(new SchemaAndRecord(record, tableSchema));
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      try {
+        record = datumReader.read(record, decoder);
+      } finally {
+        decoderTotalLatencyMs.inc(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+      }
+
+      stopwatch.reset();

Review comment:
       Good catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] emkornfield commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r444641099



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -247,8 +267,19 @@ private synchronized boolean readNextRecord() throws IOException {
             fractionConsumedFromPreviousResponse);
       }
 
-      record = datumReader.read(record, decoder);
-      current = parseFn.apply(new SchemaAndRecord(record, tableSchema));
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      try {
+        record = datumReader.read(record, decoder);
+      } finally {
+        decoderTotalLatencyMs.inc(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+      }
+
+      stopwatch.reset();

Review comment:
       I think you might need to resetart the stopwatch here: https://guava.dev/releases/19.0/api/docs/com/google/common/base/Stopwatch.html#reset()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org