You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/16 19:19:48 UTC

[GitHub] [beam] ajamato commented on a change in pull request #15294: [WIP] [BEAM-11986] Spanner write metric

ajamato commented on a change in pull request #15294:
URL: https://github.com/apache/beam/pull/15294#discussion_r688703484



##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -368,6 +376,15 @@ def process(self, element, spanner_transaction):
       for row in transaction_read(**element.kwargs):
         yield row
 
+    table_id = self._spanner_configuration.table
+    query_name = self._spanner_configuration.query_name
+    if element.is_sql:
+      self._query_metric(query_name)
+    elif element.is_table:
+      self._table_metric(table_id)
+    else:
+      pass

Review comment:
       rm else block which does nothing

##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -1068,6 +1139,28 @@ def __init__(self, spanner_configuration):
     self._spanner_configuration = spanner_configuration
     self._db_instance = None
     self.batches = Metrics.counter(self.__class__, 'SpannerBatches')
+    self.base_labels = {
+        monitoring_infos.SERVICE_LABEL: 'Spanner',
+        monitoring_infos.METHOD_LABEL: 'Write',
+        monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project,
+        monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database,
+    }
+
+  def table_write_service_call_metric(self, table_id):
+    database_id = self._spanner_configuration.database
+    project_id = self._spanner_configuration.project
+    resource = resource_identifiers.SpannerTable(
+        project_id, database_id, table_id)
+    labels = {
+        **self.base_labels,
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.SPANNER_TABLE_ID: table_id
+    }
+
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+    service_call_metric.call('ok')

Review comment:
       This only records ok status.
   Please collect the errors from the API and record those status as well. Find the grpc status code string or http status code and pass that in.
   
   Please also run an integration test and tweak it to invoke the error and ensure that is working.
   Here is an example.
   https://github.com/apache/beam/blob/c5b6a9a1289d4be12011fbfce1796136918db8a9/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L658
   
   But you may need to do something different to properly extract error codes after the API call is made.
   
   Also it would be more consistent with the code base to user a helper function in the class itself which, plumbing necessary metadata through. Rather than passing the function reference around.
   
   

##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -284,6 +288,8 @@ class _BeamSpannerConfiguration(namedtuple("_BeamSpannerConfiguration",
                                            ["project",
                                             "instance",
                                             "database",
+                                            "table",

Review comment:
       use table_id as the name here to be consistent with other "table_id" variable names




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org