You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/20 23:08:12 UTC

[GitHub] [beam] ihji commented on a change in pull request #15117: [BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage API reads

ihji commented on a change in pull request #15117:
URL: https://github.com/apache/beam/pull/15117#discussion_r673538941



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -1344,21 +1329,82 @@ private StorageClientImpl(BigQueryOptions options) throws IOException {
       this.client = BigQueryReadClient.create(settingsBuilder.build());
     }
 
+    // Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for
+    // testing
+    // So this wrapper method can be mocked in tests, instead.
+    ReadSession callCreateReadSession(CreateReadSessionRequest request) {
+      return client.createReadSession(request);
+    }
+
     @Override
     public ReadSession createReadSession(CreateReadSessionRequest request) {
-      return client.createReadSession(request);
+      TableReference tableReference =
+          BigQueryUtils.toTableReference(request.getReadSession().getTable());
+      ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+      try {
+        ReadSession session = callCreateReadSession(request);
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call("ok");
+        }
+        return session;
+
+      } catch (ApiException e) {
+        if (serviceCallMetric != null
+            && e.getStatusCode() != null

Review comment:
       Is it necessary to check the nullity of every call chain?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -105,9 +107,18 @@
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     Table targetTable = getTargetTable(bqOptions);
 
-    ReadSession.Builder readSessionBuilder =
-        ReadSession.newBuilder()
-            .setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
+    String tableReferenceId = "";
+    if (targetTable != null) {
+      tableReferenceId = BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
+    } else {
+      // If the table does not exist targetTable will be null.
+      // Construct the table id if we can generate it. For error recording/logging.
+      if (tableReferenceId != null) {

Review comment:
       Looks like `tableReferenceId != null` is always true here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -94,6 +94,8 @@
    */
   protected abstract Table getTargetTable(BigQueryOptions options) throws Exception;
 
+  protected abstract String getTargetTableId(BigQueryOptions options) throws Exception;

Review comment:
       Might need `@Nullable` annotation?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -236,8 +236,12 @@ StreamAppendClient getStreamAppendClient(String streamName, Descriptor descripto
     /** Read rows in the context of a specific read stream. */
     BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
 
+    BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request, String fullTableId);

Review comment:
       It seems that only `fullTableId` variants support metric capture. Could you make it explicit in the comment to avoid confusion (or remove non-`fullTableId` variants)?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -909,4 +930,101 @@ private static Object convertAvroNumeric(Object value) {
           "Does not support converting avro format: " + value.getClass().getName());
     }
   }
+
+  /**
+   * @param fullTableId - Is one of the two forms commonly used to refer to bigquery tables in the
+   *     beam codebase:
+   *     <ul>
+   *       <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
+   *       <li>myproject:mydataset.mytable
+   *       <li>myproject.mydataset.mytable
+   *     </ul>
+   *
+   * @return a BigQueryTableIdentifier by parsing the fullTableId. If it cannot be parsed properly
+   *     null is returned.
+   */
+  public static @Nullable TableReference toTableReference(String fullTableId) {
+    // Try parsing the format:
+    // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+    Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
+    if (m.matches()) {
+      return new TableReference()
+          .setProjectId(m.group("PROJECT"))
+          .setDatasetId(m.group("DATASET"))
+          .setTableId(m.group("TABLE"));
+    }
+
+    // If that failed, try the format:
+    // "{project_id}:{dataset_id}.{table_id}" or
+    // "{project_id}.{dataset_id}.{table_id}"
+    m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
+    if (m.matches()) {
+      return new TableReference()
+          .setProjectId(m.group("PROJECT"))
+          .setDatasetId(m.group("DATASET"))
+          .setTableId(m.group("TABLE"));
+    }
+    return null;
+  }
+
+  /**
+   * @param tableReference - The table being read from. Can be a temporary BQ table used to read
+   *     from a SQL query.
+   * @return a ServiceCallMetric for recording statuses for all BQ API responses related to reading
+   *     elements directly from BigQuery in a process-wide metric. Such as: calls to readRows,
+   *     splitReadStream, createReadSession.
+   */
+  public static ServiceCallMetric readCallMetric(TableReference tableReference) {
+    if (tableReference != null) {
+      // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
+      // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+      HashMap<String, String> baseLabels = new HashMap<String, String>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchRead");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigQueryTable(
+              tableReference.getProjectId(),
+              tableReference.getDatasetId(),
+              tableReference.getTableId()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, tableReference.getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGQUERY_DATASET, tableReference.getDatasetId());
+      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, tableReference.getTableId());
+      return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+    }
+    return null;
+  }
+
+  /**
+   * @param tableReference - The table being written to.
+   * @return a ServiceCallMetric for recording statuses for all BQ responses related to writing
+   *     elements directly to BigQuery in a process-wide metric. Such as: insertAll.
+   */
+  public static ServiceCallMetric writeCallMetric(TableReference tableReference) {

Review comment:
       `writeCallMetric` and `readCallMetric` look almost identical except `METHOD` field. Is it possible to refactor the common part into a separate method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org