You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/03/09 00:46:24 UTC
[beam] branch master updated: Retry query when the getRows() return
null with MAX_RETRY
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 94cf2ea Retry query when the getRows() return null with MAX_RETRY
new cab987a Merge pull request #8007: Retry query when the getRows() return null with MAX_RETRY
94cf2ea is described below
commit 94cf2eae9d05fc8c487aee1e2bc1989cd58b72d4
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Mar 6 16:26:45 2019 -0800
Retry query when the getRows() return null with MAX_RETRY
---
.../sdk/io/gcp/bigquery/BigQueryToTableIT.java | 43 +++++++++++++++++-----
1 file changed, 34 insertions(+), 9 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
index 8d6dcff1..437bbf9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
@@ -19,6 +19,9 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.junit.Assert.assertEquals;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
@@ -27,6 +30,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.security.SecureRandom;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -46,9 +50,12 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -85,6 +92,7 @@ public class BigQueryToTableIT {
ImmutableMap.of("bytes", "abc=", "date", "2000-01-01", "time", "00:00:00"),
ImmutableMap.of("bytes", "dec=", "date", "3000-12-31", "time", "23:59:59.990000"),
ImmutableMap.of("bytes", "xyw=", "date", "2011-01-01", "time", "23:59:59.999999"));
+ private static final int MAX_RETRY = 5;
private void runBigQueryToTablePipeline(BigQueryToTableOptions options) {
Pipeline p = Pipeline.create(options);
@@ -142,23 +150,38 @@ public class BigQueryToTableIT {
return options;
}
+ private List<TableRow> getTableRowsFromQuery(String query, int maxRetry) throws Exception {
+ FluentBackoff backoffFactory =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(maxRetry)
+ .withInitialBackoff(Duration.standardSeconds(1L));
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = BackOffAdapter.toGcpBackOff(backoffFactory.backoff());
+ do {
+ LOG.info("Starting querying {}", query);
+ QueryResponse response = BQ_CLIENT.queryWithRetries(query, project);
+ if (response.getRows() != null) {
+ LOG.info("Got table content with query {}", query);
+ return response.getRows();
+ }
+ } while (BackOffUtils.next(sleeper, backoff));
+ LOG.info("Got empty table for query {} with retry {}", query, maxRetry);
+ return Collections.emptyList();
+ }
+
private void verifyLegacyQueryRes(String outputTable) throws Exception {
- LOG.info("Starting verifyLegacyQueryRes in outputTable {}", outputTable);
List<String> legacyQueryExpectedRes = ImmutableList.of("apple", "orange");
- QueryResponse response =
- BQ_CLIENT.queryWithRetries(String.format("SELECT fruit from [%s];", outputTable), project);
- LOG.info("Finished to query result table {}", outputTable);
+ List<TableRow> tableRows =
+ getTableRowsFromQuery(String.format("SELECT fruit from [%s];", outputTable), MAX_RETRY);
List<String> tableResult =
- response.getRows().stream()
+ tableRows.stream()
.flatMap(row -> row.getF().stream().map(cell -> cell.getV().toString()))
.sorted()
.collect(Collectors.toList());
-
assertEquals(legacyQueryExpectedRes, tableResult);
}
private void verifyNewTypesQueryRes(String outputTable) throws Exception {
- LOG.info("Starting verifyNewTypesQueryRes with outputTable {}", outputTable);
List<String> newTypeQueryExpectedRes =
ImmutableList.of(
"abc=,2000-01-01,00:00:00",
@@ -167,9 +190,11 @@ public class BigQueryToTableIT {
QueryResponse response =
BQ_CLIENT.queryWithRetries(
String.format("SELECT bytes, date, time FROM [%s];", outputTable), project);
- LOG.info("Finished to query result table {}", outputTable);
+ List<TableRow> tableRows =
+ getTableRowsFromQuery(
+ String.format("SELECT bytes, date, time FROM [%s];", outputTable), MAX_RETRY);
List<String> tableResult =
- response.getRows().stream()
+ tableRows.stream()
.map(
row -> {
String res = "";