You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/19 23:29:17 UTC
[2/3] incubator-beam git commit: Add BigQuery Verifier to
WindowedWordCountIT
Add BigQuery Verifier to WindowedWordCountIT
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd46523d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd46523d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd46523d
Branch: refs/heads/master
Commit: dd46523dc2bca4aee11265a2fb065cc137920b1d
Parents: 8e225d7
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Thu Oct 6 14:34:55 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 19 16:22:57 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCountIT.java | 11 +++++++++++
.../beam/examples/cookbook/BigQueryTornadoesIT.java | 2 +-
.../apache/beam/sdk/testing/BigqueryMatcher.java | 16 ++++++++++------
.../beam/sdk/testing/BigqueryMatcherTest.java | 2 +-
4 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 379d1b0..6742654 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -19,8 +19,10 @@ package org.apache.beam.examples;
import java.io.IOException;
import org.apache.beam.examples.WindowedWordCount.Options;
+import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.BigqueryMatcher;
import org.apache.beam.sdk.testing.StreamingIT;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -40,6 +42,9 @@ public class WindowedWordCountIT {
*/
public interface WindowedWordCountITOptions
extends Options, TestPipelineOptions, StreamingOptions {
+ @Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4")
+ String getChecksum();
+ void setChecksum(String value);
}
@Test
@@ -59,6 +64,12 @@ public class WindowedWordCountIT {
TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
options.setStreaming(isStreaming);
+ String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word",
+ options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable());
+ options.setOnSuccessMatcher(
+ new BigqueryMatcher(
+ options.getAppName(), options.getProject(), query, options.getChecksum()));
+
WindowedWordCount.main(TestPipeline.convertToArgs(options));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index 7e15389..27a5a8f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -39,7 +39,7 @@ public class BigQueryTornadoesIT {
*/
public interface BigQueryTornadoesITOptions
extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions {
- @Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f")
+ @Default.String("1ab4c7ec460b94bbb3c3885b178bf0e6bed56e1f")
String getChecksum();
void setChecksum(String value);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
index 7646caa..95208ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
@@ -40,8 +40,10 @@ import com.google.common.hash.Hashing;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.util.FluentBackoff;
@@ -182,14 +184,16 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
return credential;
}
- private String generateHash(List<TableRow> rows) {
+ private String generateHash(@Nonnull List<TableRow> rows) {
List<HashCode> rowHashes = Lists.newArrayList();
for (TableRow row : rows) {
- List<HashCode> cellHashes = Lists.newArrayList();
+ List<String> cellsInOneRow = Lists.newArrayList();
for (TableCell cell : row.getF()) {
- cellHashes.add(Hashing.sha1().hashString(cell.toString(), StandardCharsets.UTF_8));
+ cellsInOneRow.add(Objects.toString(cell.getV()));
+ Collections.sort(cellsInOneRow);
}
- rowHashes.add(Hashing.combineUnordered(cellHashes));
+ rowHashes.add(
+ Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8));
}
return Hashing.combineUnordered(rowHashes).toString();
}
@@ -222,13 +226,13 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
StringBuilder samples = new StringBuilder();
List<TableRow> rows = response.getRows();
for (int i = 0; i < totalNumRows && i < rows.size(); i++) {
- samples.append("\n\t\t");
+ samples.append(String.format("%n\t\t"));
for (TableCell field : rows.get(i).getF()) {
samples.append(String.format("%-10s", field.getV()));
}
}
if (rows.size() > totalNumRows) {
- samples.append("\n\t\t....");
+ samples.append(String.format("%n\t\t..."));
}
return samples.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
index 1c427f8..d0ae765 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
@@ -74,7 +74,7 @@ public class BigqueryMatcherTest {
public void testBigqueryMatcherThatSucceeds() throws Exception {
BigqueryMatcher matcher = spy(
new BigqueryMatcher(
- appName, projectId, query, "8d1bbbf1f523f924b98c88b00c5811e041c2f855"));
+ appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
when(mockQuery.execute()).thenReturn(createResponseContainingTestData());