You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/03 17:23:57 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #12403: [BEAM-10597] Propagate BigQuery streaming insert throttled time to Dataflow worker

chamikaramj commented on a change in pull request #12403:
URL: https://github.com/apache/beam/pull/12403#discussion_r464549909



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
##########
@@ -543,6 +547,16 @@ public Long extractThrottleTime() {
         totalThrottleTime += httpClientApiThrottlingTime.getCumulative();
       }
 
+      CounterCell bigqueryStreamingInsertThrottleTime =
+          container.tryGetCounter(
+              MetricName.named(
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAME));

Review comment:
       Can we use the same name as above ("cumulativeThrottlingSeconds") and move it to a constant (and also do ms to sec conversion when setting) ?

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -520,8 +526,11 @@ public int getSize() {
     private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
       CounterStructuredName structuredName =
           stepCounterUpdate.getStructuredNameAndMetadata().getName();
-      if (THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
-          && THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName())) {
+      if ((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
+              && THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
+          || (BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE.equals(

Review comment:
       Is there a reason why we needed to use a  unique name for BQ but not for GCS or Datastore ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -867,6 +872,7 @@ public void deleteDataset(String projectId, String datasetId)
         }
         try {
           sleeper.sleep(nextBackoffMillis);
+          throttlingMilliSeconds.inc(nextBackoffMillis);

Review comment:
       This is for failures. Probably you need to increment the counter for backoff1 for rate limit errors above.
   
   cc: @ihji 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org