You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/14 20:45:25 UTC
[1/2] incubator-beam git commit: [BEAM-1033] Retry Bigquery Verifier
when Query Fails
Repository: incubator-beam
Updated Branches:
refs/heads/master 4927cc1ab -> 0b0a1b797
[BEAM-1033] Retry Bigquery Verifier when Query Fails
Update Junit to 4.12
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b626f0e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b626f0e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b626f0e6
Branch: refs/heads/master
Commit: b626f0e627af85b2aa01213587b4130932030166
Parents: 4927cc1
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Wed Nov 30 22:20:12 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 12:44:47 2016 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
.../beam/sdk/testing/BigqueryMatcher.java | 48 ++++++++----
.../beam/sdk/testing/BigqueryMatcherTest.java | 82 ++++++++++----------
.../main/resources/archetype-resources/pom.xml | 2 +-
.../main/resources/archetype-resources/pom.xml | 2 +-
5 files changed, 76 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4faa971..970547d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
<jackson.version>2.7.2</jackson.version>
<findbugs.version>3.0.1</findbugs.version>
<joda.version>2.4</joda.version>
- <junit.version>4.11</junit.version>
+ <junit.version>4.12</junit.version>
<mockito.version>1.9.5</mockito.version>
<netty.version>4.1.3.Final</netty.version>
<os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
index 9b8589a..8f752c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
@@ -117,20 +118,23 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
response = queryWithRetries(
bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
- } catch (Exception e) {
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedIOException) {
+ Thread.currentThread().interrupt();
+ }
throw new RuntimeException("Failed to fetch BigQuery data.", e);
}
- // validate BigQuery response
- if (response == null || response.getRows() == null || response.getRows().isEmpty()) {
+ if (!response.getJobComplete()) {
+ // query job not complete, verification failed
return false;
- }
-
- // compute checksum
- actualChecksum = generateHash(response.getRows());
- LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
+ } else {
+ // compute checksum
+ actualChecksum = generateHash(response.getRows());
+ LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
- return expectedChecksum.equals(actualChecksum);
+ return expectedChecksum.equals(actualChecksum);
+ }
}
@VisibleForTesting
@@ -144,23 +148,35 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
.build();
}
+ @Nonnull
@VisibleForTesting
QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
Sleeper sleeper, BackOff backOff)
throws IOException, InterruptedException {
IOException lastException = null;
do {
+ if (lastException != null) {
+ LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
+ }
try {
- return bigqueryClient.jobs().query(projectId, queryContent).execute();
+ QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
+ if (response != null) {
+ return response;
+ } else {
+ lastException =
+ new IOException("Expected valid response from query job, but received null.");
+ }
} catch (IOException e) {
// ignore and retry
- LOG.warn("Ignore the error and retry the query.");
lastException = e;
}
} while(BackOffUtils.next(sleeper, backOff));
- throw new IOException(
+
+ throw new RuntimeException(
String.format(
- "Unable to get BigQuery response after retrying %d times", MAX_QUERY_RETRIES),
+ "Unable to get BigQuery response after retrying %d times using query (%s)",
+ MAX_QUERY_RETRIES,
+ queryContent.getQuery()),
lastException);
}
@@ -210,9 +226,9 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
@Override
public void describeMismatchSafely(PipelineResult pResult, Description description) {
String info;
- if (response == null || response.getRows() == null || response.getRows().isEmpty()) {
- // invalid query response
- info = String.format("Invalid BigQuery response: %s", Objects.toString(response));
+ if (!response.getJobComplete()) {
+ // query job not complete
+ info = String.format("The query job hasn't completed. Got response: %s", response);
} else {
// checksum mismatch
info = String.format("was (%s).%n"
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
index d0ae765..3b35856 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
@@ -18,14 +18,12 @@
package org.apache.beam.sdk.testing;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -90,52 +88,33 @@ public class BigqueryMatcherTest {
doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
+ thrown.expect(AssertionError.class);
+ thrown.expectMessage("Total number of rows are: 1");
+ thrown.expectMessage("abc");
try {
assertThat(mockResult, matcher);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("Total number of rows are: 1"));
- assertThat(expected.getMessage(), containsString("abc"));
+ } finally {
verify(matcher).newBigqueryClient(eq(appName));
verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
}
}
@Test
- public void testBigqueryMatcherFailsWhenResponseIsNull() throws IOException {
- testMatcherFailsSinceInvalidQueryResponse(null);
- }
-
- @Test
- public void testBigqueryMatcherFailsWhenNullRowsInResponse() throws IOException {
- testMatcherFailsSinceInvalidQueryResponse(new QueryResponse());
- }
-
- @Test
- public void testBigqueryMatcherFailsWhenEmptyRowsInResponse() throws IOException {
- QueryResponse response = new QueryResponse();
- response.setRows(Lists.<TableRow>newArrayList());
-
- testMatcherFailsSinceInvalidQueryResponse(response);
- }
-
- private void testMatcherFailsSinceInvalidQueryResponse(QueryResponse response)
- throws IOException {
+ public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
BigqueryMatcher matcher = spy(
new BigqueryMatcher(appName, projectId, query, "some-checksum"));
doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
- when(mockQuery.execute()).thenReturn(response);
+ when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));
+ thrown.expect(AssertionError.class);
+ thrown.expectMessage("The query job hasn't completed.");
+ thrown.expectMessage("jobComplete=false");
try {
assertThat(mockResult, matcher);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("Invalid BigQuery response:"));
+ } finally {
verify(matcher).newBigqueryClient(eq(appName));
verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
- return;
}
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError is expected.");
}
@Test
@@ -144,18 +123,38 @@ public class BigqueryMatcherTest {
new BigqueryMatcher(appName, projectId, query, "some-checksum"));
when(mockQuery.execute()).thenThrow(new IOException());
- thrown.expect(IOException.class);
+ thrown.expect(RuntimeException.class);
thrown.expectMessage("Unable to get BigQuery response after retrying");
+ try {
+ matcher.queryWithRetries(
+ mockBigqueryClient,
+ new QueryRequest(),
+ fastClock,
+ BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ } finally {
+ verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+ .query(eq(projectId), eq(new QueryRequest()));
+ }
+ }
- matcher.queryWithRetries(
- mockBigqueryClient,
- new QueryRequest(),
- fastClock,
- BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ @Test
+ public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
+ BigqueryMatcher matcher = spy(
+ new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+ when(mockQuery.execute()).thenReturn(null);
- verify(matcher).newBigqueryClient(eq(appName));
- verify(mockJobs, times(BigqueryMatcher.MAX_QUERY_RETRIES))
- .query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to get BigQuery response after retrying");
+ try {
+ matcher.queryWithRetries(
+ mockBigqueryClient,
+ new QueryRequest(),
+ fastClock,
+ BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ } finally {
+ verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+ .query(eq(projectId), eq(new QueryRequest()));
+ }
}
private QueryResponse createResponseContainingTestData() {
@@ -169,6 +168,7 @@ public class BigqueryMatcherTest {
row.setF(Lists.newArrayList(field1, field2, field3));
QueryResponse response = new QueryResponse();
+ response.setJobComplete(true);
response.setRows(Lists.newArrayList(row));
response.setTotalRows(BigInteger.ONE);
return response;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
index 65068c1..615160c 100644
--- a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
@@ -282,7 +282,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
+ <version>4.12</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index 9ec2223..271fc0c 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -328,7 +328,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
+ <version>4.12</version>
</dependency>
<!-- The DirectRunner is needed for unit tests. -->
[2/2] incubator-beam git commit: This closes #1479
Posted by tg...@apache.org.
This closes #1479
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b0a1b79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b0a1b79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b0a1b79
Branch: refs/heads/master
Commit: 0b0a1b79794598478033105e0582650fe17953ac
Parents: 4927cc1 b626f0e
Author: Thomas Groh <tg...@google.com>
Authored: Wed Dec 14 12:45:09 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 12:45:09 2016 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
.../beam/sdk/testing/BigqueryMatcher.java | 48 ++++++++----
.../beam/sdk/testing/BigqueryMatcherTest.java | 82 ++++++++++----------
.../main/resources/archetype-resources/pom.xml | 2 +-
.../main/resources/archetype-resources/pom.xml | 2 +-
5 files changed, 76 insertions(+), 60 deletions(-)
----------------------------------------------------------------------