You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/09 21:58:20 UTC
[2/4] incubator-beam git commit: Fix TriggerExampleTest
Fix TriggerExampleTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77aa0938
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77aa0938
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77aa0938
Branch: refs/heads/master
Commit: 77aa0938f75f9f3d18a4fa79a5ffe6159167f4d5
Parents: 810ffeb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 8 16:22:34 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700
----------------------------------------------------------------------
.../examples/cookbook/TriggerExampleTest.java | 61 +++++++++++++-------
1 file changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77aa0938/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index fe75d14..cddce7f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -34,6 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -44,7 +46,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* Unit Tests for {@link TriggerExample}.
@@ -70,21 +74,27 @@ public class TriggerExampleTest {
+ "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0"
+ ",,,,,0,,,,,0", new Instant(1)));
- private static final TableRow OUT_ROW_1 = new TableRow()
- .set("trigger_type", "default")
- .set("freeway", "5").set("total_flow", 30)
- .set("number_of_records", 1)
- .set("isFirst", true).set("isLast", true)
- .set("timing", "ON_TIME")
- .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
-
- private static final TableRow OUT_ROW_2 = new TableRow()
- .set("trigger_type", "default")
- .set("freeway", "110").set("total_flow", 90)
- .set("number_of_records", 2)
- .set("isFirst", true).set("isLast", true)
- .set("timing", "ON_TIME")
- .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
+ private static final TableRow OUT_ROW_1 =
+ new TableRow()
+ .set("trigger_type", "default")
+ .set("freeway", "5")
+ .set("total_flow", 30)
+ .set("number_of_records", 1)
+ .set("isFirst", true)
+ .set("isLast", true)
+ .set("timing", "ON_TIME")
+ .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
+
+ private static final TableRow OUT_ROW_2 =
+ new TableRow()
+ .set("trigger_type", "default")
+ .set("freeway", "110")
+ .set("total_flow", 90)
+ .set("number_of_records", 2)
+ .set("isFirst", true)
+ .set("isLast", true)
+ .set("timing", "ON_TIME")
+ .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
@Test
public void testExtractTotalFlow() throws Exception {
@@ -112,15 +122,26 @@ public class TriggerExampleTest {
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new TotalFlow("default"));
- PCollection<TableRow> results = totalFlow.apply(ParDo.of(new FormatResults()));
+ PCollection<String> results = totalFlow.apply(ParDo.of(new FormatResults()));
-
- PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2);
+ PAssert.that(results)
+ .containsInAnyOrder(canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2));
pipeline.run();
}
- static class FormatResults extends DoFn<TableRow, TableRow> {
+ // Sort the fields and toString() the values, since TableRow has a bit of a dynamically
+ // typed API and equals()/hashCode() are not appropriate for matching in tests
+ static String canonicalFormat(TableRow row) {
+ List<String> entries = Lists.newArrayListWithCapacity(row.size());
+ for (Map.Entry<String, Object> entry : row.entrySet()) {
+ entries.add(entry.getKey() + ":" + entry.getValue());
+ }
+ Collections.sort(entries);
+ return Joiner.on(",").join(entries);
+ }
+
+ static class FormatResults extends DoFn<TableRow, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow element = c.element();
@@ -133,7 +154,7 @@ public class TriggerExampleTest {
.set("isLast", element.get("isLast"))
.set("timing", element.get("timing"))
.set("window", element.get("window"));
- c.output(row);
+ c.output(canonicalFormat(row));
}
}
}