You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/19 23:29:17 UTC

[2/3] incubator-beam git commit: Add BigQuery Verifier to WindowedWordCountIT

Add BigQuery Verifier to WindowedWordCountIT


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd46523d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd46523d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd46523d

Branch: refs/heads/master
Commit: dd46523dc2bca4aee11265a2fb065cc137920b1d
Parents: 8e225d7
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Thu Oct 6 14:34:55 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 19 16:22:57 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCountIT.java   | 11 +++++++++++
 .../beam/examples/cookbook/BigQueryTornadoesIT.java |  2 +-
 .../apache/beam/sdk/testing/BigqueryMatcher.java    | 16 ++++++++++------
 .../beam/sdk/testing/BigqueryMatcherTest.java       |  2 +-
 4 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 379d1b0..6742654 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -19,8 +19,10 @@ package org.apache.beam.examples;
 
 import java.io.IOException;
 import org.apache.beam.examples.WindowedWordCount.Options;
+import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.BigqueryMatcher;
 import org.apache.beam.sdk.testing.StreamingIT;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -40,6 +42,9 @@ public class WindowedWordCountIT {
    */
   public interface WindowedWordCountITOptions
       extends Options, TestPipelineOptions, StreamingOptions {
+    @Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4")
+    String getChecksum();
+    void setChecksum(String value);
   }
 
   @Test
@@ -59,6 +64,12 @@ public class WindowedWordCountIT {
         TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
     options.setStreaming(isStreaming);
 
+    String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word",
+        options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable());
+    options.setOnSuccessMatcher(
+        new BigqueryMatcher(
+            options.getAppName(), options.getProject(), query, options.getChecksum()));
+
     WindowedWordCount.main(TestPipeline.convertToArgs(options));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index 7e15389..27a5a8f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -39,7 +39,7 @@ public class BigQueryTornadoesIT {
    */
   public interface BigQueryTornadoesITOptions
       extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions {
-    @Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f")
+    @Default.String("1ab4c7ec460b94bbb3c3885b178bf0e6bed56e1f")
     String getChecksum();
     void setChecksum(String value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
index 7646caa..95208ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
@@ -40,8 +40,10 @@ import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.util.FluentBackoff;
@@ -182,14 +184,16 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
     return credential;
   }
 
-  private String generateHash(List<TableRow> rows) {
+  private String generateHash(@Nonnull List<TableRow> rows) {
     List<HashCode> rowHashes = Lists.newArrayList();
     for (TableRow row : rows) {
-      List<HashCode> cellHashes = Lists.newArrayList();
+      List<String> cellsInOneRow = Lists.newArrayList();
       for (TableCell cell : row.getF()) {
-        cellHashes.add(Hashing.sha1().hashString(cell.toString(), StandardCharsets.UTF_8));
+        cellsInOneRow.add(Objects.toString(cell.getV()));
+        Collections.sort(cellsInOneRow);
       }
-      rowHashes.add(Hashing.combineUnordered(cellHashes));
+      rowHashes.add(
+          Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
     }
     return Hashing.combineUnordered(rowHashes).toString();
   }
@@ -222,13 +226,13 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
     StringBuilder samples = new StringBuilder();
     List<TableRow> rows = response.getRows();
     for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
-      samples.append("\n\t\t");
+      samples.append(String.format("%n\t\t"));
       for (TableCell field : rows.get(i).getF()) {
         samples.append(String.format("%-10s", field.getV()));
       }
     }
     if (rows.size() > totalNumRows) {
-      samples.append("\n\t\t....");
+      samples.append(String.format("%n\t\t..."));
     }
     return samples.toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
index 1c427f8..d0ae765 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
@@ -74,7 +74,7 @@ public class BigqueryMatcherTest {
   public void testBigqueryMatcherThatSucceeds() throws Exception {
     BigqueryMatcher matcher = spy(
         new BigqueryMatcher(
-            appName, projectId, query, "8d1bbbf1f523f924b98c88b00c5811e041c2f855"));
+            appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
     doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
     when(mockQuery.execute()).thenReturn(createResponseContainingTestData());