You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/15 16:23:40 UTC

[GitHub] [beam] ajamato commented on a change in pull request #12822: [BEAM-10880] Log error counts to debug BigQuery streaming insert requ…

ajamato commented on a change in pull request #12822:
URL: https://github.com/apache/beam/pull/12822#discussion_r488797757



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1210,20 +1236,32 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
-    if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):
+    if BigQueryWriteFn.STREAMING_API_LOGGING_LOCK.acquire(False):

Review comment:
       Can all of this be encapsulated entirely in a new class? or at least a helper that can be called in finish_bundle
   
   Ideally we can instrument this without  adding so much to the original implementation.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1210,20 +1236,32 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
-    if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):
+    if BigQueryWriteFn.STREAMING_API_LOGGING_LOCK.acquire(False):
       try:
         current_millis = int(time.time() * 1000)
-        if (BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.total_count() > 0 and
-            (current_millis -
-             BigQueryWriteFn.LATENCY_LOGGING_LAST_REPORTED_MILLIS) >
-            self._latency_logging_frequency_msec):
-          _LOGGER.info(
-              BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.get_percentile_info(
-                  'streaming insert requests', 'ms'))
-          BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.clear()
-          BigQueryWriteFn.LATENCY_LOGGING_LAST_REPORTED_MILLIS = current_millis
+        if ((current_millis -
+             BigQueryWriteFn.STREAMING_API_LOGGING_LAST_REPORTED_MILLIS) >
+            self._streaming_api_logging_frequency_msec):
+          streaming_api_info = [
+              '[Streaming Insert API Statistics since %s]' %
+              datetime.datetime.fromtimestamp(
+                  BigQueryWriteFn.STREAMING_API_LOGGING_LAST_REPORTED_MILLIS /
+                  1000.0)
+          ]
+          if BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.total_count() > 0:
+            streaming_api_info.append(

Review comment:
       Please address code coverage warnings
   
   
   Codecov
   / codecov/patch
   
   sdks/python/apache_beam/io/gcp/bigquery.py#L1252
   Added line #L1252 was not covered by tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org