You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/14 20:45:25 UTC

[1/2] incubator-beam git commit: [BEAM-1033] Retry Bigquery Verifier when Query Fails

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4927cc1ab -> 0b0a1b797


[BEAM-1033] Retry Bigquery Verifier when Query Fails

Update Junit to 4.12


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b626f0e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b626f0e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b626f0e6

Branch: refs/heads/master
Commit: b626f0e627af85b2aa01213587b4130932030166
Parents: 4927cc1
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Wed Nov 30 22:20:12 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 12:44:47 2016 -0800

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 .../beam/sdk/testing/BigqueryMatcher.java       | 48 ++++++++----
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 82 ++++++++++----------
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 5 files changed, 76 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4faa971..970547d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
     <jackson.version>2.7.2</jackson.version>
     <findbugs.version>3.0.1</findbugs.version>
     <joda.version>2.4</joda.version>
-    <junit.version>4.11</junit.version>
+    <junit.version>4.12</junit.version>
     <mockito.version>1.9.5</mockito.version>
     <netty.version>4.1.3.Final</netty.version>
     <os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
index 9b8589a..8f752c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
@@ -117,20 +118,23 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
 
       response = queryWithRetries(
           bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
-    } catch (Exception e) {
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedIOException) {
+        Thread.currentThread().interrupt();
+      }
       throw new RuntimeException("Failed to fetch BigQuery data.", e);
     }
 
-    // validate BigQuery response
-    if (response == null || response.getRows() == null || response.getRows().isEmpty()) {
+    if (!response.getJobComplete()) {
+      // query job not complete, verification failed
       return false;
-    }
-
-    // compute checksum
-    actualChecksum = generateHash(response.getRows());
-    LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
+    } else {
+      // compute checksum
+      actualChecksum = generateHash(response.getRows());
+      LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
 
-    return expectedChecksum.equals(actualChecksum);
+      return expectedChecksum.equals(actualChecksum);
+    }
   }
 
   @VisibleForTesting
@@ -144,23 +148,35 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
         .build();
   }
 
+  @Nonnull
   @VisibleForTesting
   QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
                                  Sleeper sleeper, BackOff backOff)
       throws IOException, InterruptedException {
     IOException lastException = null;
     do {
+      if (lastException != null) {
+        LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
+      }
       try {
-        return bigqueryClient.jobs().query(projectId, queryContent).execute();
+        QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
+        if (response != null) {
+          return response;
+        } else {
+          lastException =
+              new IOException("Expected valid response from query job, but received null.");
+        }
       } catch (IOException e) {
         // ignore and retry
-        LOG.warn("Ignore the error and retry the query.");
         lastException = e;
       }
     } while(BackOffUtils.next(sleeper, backOff));
-    throw new IOException(
+
+    throw new RuntimeException(
         String.format(
-            "Unable to get BigQuery response after retrying %d times", MAX_QUERY_RETRIES),
+            "Unable to get BigQuery response after retrying %d times using query (%s)",
+            MAX_QUERY_RETRIES,
+            queryContent.getQuery()),
         lastException);
   }
 
@@ -210,9 +226,9 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
   @Override
   public void describeMismatchSafely(PipelineResult pResult, Description description) {
     String info;
-    if (response == null || response.getRows() == null || response.getRows().isEmpty()) {
-      // invalid query response
-      info = String.format("Invalid BigQuery response: %s", Objects.toString(response));
+    if (!response.getJobComplete()) {
+      // query job not complete
+      info = String.format("The query job hasn't completed. Got response: %s", response);
     } else {
       // checksum mismatch
       info = String.format("was (%s).%n"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
index d0ae765..3b35856 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
@@ -18,14 +18,12 @@
 package org.apache.beam.sdk.testing;
 
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -90,52 +88,33 @@ public class BigqueryMatcherTest {
     doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
     when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
 
+    thrown.expect(AssertionError.class);
+    thrown.expectMessage("Total number of rows are: 1");
+    thrown.expectMessage("abc");
     try {
       assertThat(mockResult, matcher);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("Total number of rows are: 1"));
-      assertThat(expected.getMessage(), containsString("abc"));
+    } finally {
       verify(matcher).newBigqueryClient(eq(appName));
       verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
     }
   }
 
   @Test
-  public void testBigqueryMatcherFailsWhenResponseIsNull() throws IOException {
-    testMatcherFailsSinceInvalidQueryResponse(null);
-  }
-
-  @Test
-  public void testBigqueryMatcherFailsWhenNullRowsInResponse() throws IOException {
-    testMatcherFailsSinceInvalidQueryResponse(new QueryResponse());
-  }
-
-  @Test
-  public void testBigqueryMatcherFailsWhenEmptyRowsInResponse() throws IOException {
-    QueryResponse response = new QueryResponse();
-    response.setRows(Lists.<TableRow>newArrayList());
-
-    testMatcherFailsSinceInvalidQueryResponse(response);
-  }
-
-  private void testMatcherFailsSinceInvalidQueryResponse(QueryResponse response)
-      throws IOException {
+  public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
     BigqueryMatcher matcher = spy(
         new BigqueryMatcher(appName, projectId, query, "some-checksum"));
     doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(response);
+    when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));
 
+    thrown.expect(AssertionError.class);
+    thrown.expectMessage("The query job hasn't completed.");
+    thrown.expectMessage("jobComplete=false");
     try {
       assertThat(mockResult, matcher);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("Invalid BigQuery response:"));
+    } finally {
       verify(matcher).newBigqueryClient(eq(appName));
       verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
-      return;
     }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError is expected.");
   }
 
   @Test
@@ -144,18 +123,38 @@ public class BigqueryMatcherTest {
         new BigqueryMatcher(appName, projectId, query, "some-checksum"));
     when(mockQuery.execute()).thenThrow(new IOException());
 
-    thrown.expect(IOException.class);
+    thrown.expect(RuntimeException.class);
     thrown.expectMessage("Unable to get BigQuery response after retrying");
+    try {
+      matcher.queryWithRetries(
+          mockBigqueryClient,
+          new QueryRequest(),
+          fastClock,
+          BigqueryMatcher.BACKOFF_FACTORY.backoff());
+    } finally {
+      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+          .query(eq(projectId), eq(new QueryRequest()));
+    }
+  }
 
-    matcher.queryWithRetries(
-        mockBigqueryClient,
-        new QueryRequest(),
-        fastClock,
-        BigqueryMatcher.BACKOFF_FACTORY.backoff());
+  @Test
+  public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
+    BigqueryMatcher matcher = spy(
+        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+    when(mockQuery.execute()).thenReturn(null);
 
-    verify(matcher).newBigqueryClient(eq(appName));
-    verify(mockJobs, times(BigqueryMatcher.MAX_QUERY_RETRIES))
-        .query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to get BigQuery response after retrying");
+    try {
+      matcher.queryWithRetries(
+          mockBigqueryClient,
+          new QueryRequest(),
+          fastClock,
+          BigqueryMatcher.BACKOFF_FACTORY.backoff());
+    } finally {
+      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+          .query(eq(projectId), eq(new QueryRequest()));
+    }
   }
 
   private QueryResponse createResponseContainingTestData() {
@@ -169,6 +168,7 @@ public class BigqueryMatcherTest {
     row.setF(Lists.newArrayList(field1, field2, field3));
 
     QueryResponse response = new QueryResponse();
+    response.setJobComplete(true);
     response.setRows(Lists.newArrayList(row));
     response.setTotalRows(BigInteger.ONE);
     return response;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
index 65068c1..615160c 100644
--- a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
@@ -282,7 +282,7 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <version>4.11</version>
+      <version>4.12</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index 9ec2223..271fc0c 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -328,7 +328,7 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <version>4.11</version>
+      <version>4.12</version>
     </dependency>
 
     <!-- The DirectRunner is needed for unit tests. -->


[2/2] incubator-beam git commit: This closes #1479

Posted by tg...@apache.org.
This closes #1479


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b0a1b79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b0a1b79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b0a1b79

Branch: refs/heads/master
Commit: 0b0a1b79794598478033105e0582650fe17953ac
Parents: 4927cc1 b626f0e
Author: Thomas Groh <tg...@google.com>
Authored: Wed Dec 14 12:45:09 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 12:45:09 2016 -0800

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 .../beam/sdk/testing/BigqueryMatcher.java       | 48 ++++++++----
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 82 ++++++++++----------
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 5 files changed, 76 insertions(+), 60 deletions(-)
----------------------------------------------------------------------