You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/03/09 00:46:24 UTC

[beam] branch master updated: Retry query when the getRows() return null with MAX_RETRY

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 94cf2ea  Retry query when the getRows() return null with MAX_RETRY
     new cab987a  Merge pull request #8007: Retry query when the getRows() return null with MAX_RETRY
94cf2ea is described below

commit 94cf2eae9d05fc8c487aee1e2bc1989cd58b72d4
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Mar 6 16:26:45 2019 -0800

    Retry query when the getRows() return null with MAX_RETRY
---
 .../sdk/io/gcp/bigquery/BigQueryToTableIT.java     | 43 +++++++++++++++++-----
 1 file changed, 34 insertions(+), 9 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
index 8d6dcff1..437bbf9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
@@ -19,6 +19,9 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.QueryResponse;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableCell;
@@ -27,6 +30,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.security.SecureRandom;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -46,9 +50,12 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -85,6 +92,7 @@ public class BigQueryToTableIT {
           ImmutableMap.of("bytes", "abc=", "date", "2000-01-01", "time", "00:00:00"),
           ImmutableMap.of("bytes", "dec=", "date", "3000-12-31", "time", "23:59:59.990000"),
           ImmutableMap.of("bytes", "xyw=", "date", "2011-01-01", "time", "23:59:59.999999"));
+  private static final int MAX_RETRY = 5;
 
   private void runBigQueryToTablePipeline(BigQueryToTableOptions options) {
     Pipeline p = Pipeline.create(options);
@@ -142,23 +150,38 @@ public class BigQueryToTableIT {
     return options;
   }
 
+  private List<TableRow> getTableRowsFromQuery(String query, int maxRetry) throws Exception {
+    FluentBackoff backoffFactory =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(maxRetry)
+            .withInitialBackoff(Duration.standardSeconds(1L));
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backoff = BackOffAdapter.toGcpBackOff(backoffFactory.backoff());
+    do {
+      LOG.info("Starting querying {}", query);
+      QueryResponse response = BQ_CLIENT.queryWithRetries(query, project);
+      if (response.getRows() != null) {
+        LOG.info("Got table content with query {}", query);
+        return response.getRows();
+      }
+    } while (BackOffUtils.next(sleeper, backoff));
+    LOG.info("Got empty table for query {} with retry {}", query, maxRetry);
+    return Collections.emptyList();
+  }
+
   private void verifyLegacyQueryRes(String outputTable) throws Exception {
-    LOG.info("Starting verifyLegacyQueryRes in outputTable {}", outputTable);
     List<String> legacyQueryExpectedRes = ImmutableList.of("apple", "orange");
-    QueryResponse response =
-        BQ_CLIENT.queryWithRetries(String.format("SELECT fruit from [%s];", outputTable), project);
-    LOG.info("Finished to query result table {}", outputTable);
+    List<TableRow> tableRows =
+        getTableRowsFromQuery(String.format("SELECT fruit from [%s];", outputTable), MAX_RETRY);
     List<String> tableResult =
-        response.getRows().stream()
+        tableRows.stream()
             .flatMap(row -> row.getF().stream().map(cell -> cell.getV().toString()))
             .sorted()
             .collect(Collectors.toList());
-
     assertEquals(legacyQueryExpectedRes, tableResult);
   }
 
   private void verifyNewTypesQueryRes(String outputTable) throws Exception {
-    LOG.info("Starting verifyNewTypesQueryRes with outputTable {}", outputTable);
     List<String> newTypeQueryExpectedRes =
         ImmutableList.of(
             "abc=,2000-01-01,00:00:00",
@@ -167,9 +190,11 @@ public class BigQueryToTableIT {
     QueryResponse response =
         BQ_CLIENT.queryWithRetries(
             String.format("SELECT bytes, date, time FROM [%s];", outputTable), project);
-    LOG.info("Finished to query result table {}", outputTable);
+    List<TableRow> tableRows =
+        getTableRowsFromQuery(
+            String.format("SELECT bytes, date, time FROM [%s];", outputTable), MAX_RETRY);
     List<String> tableResult =
-        response.getRows().stream()
+        tableRows.stream()
             .map(
                 row -> {
                   String res = "";