You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/09 21:58:20 UTC

[2/4] incubator-beam git commit: Fix TriggerExampleTest

Fix TriggerExampleTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77aa0938
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77aa0938
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77aa0938

Branch: refs/heads/master
Commit: 77aa0938f75f9f3d18a4fa79a5ffe6159167f4d5
Parents: 810ffeb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 8 16:22:34 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../examples/cookbook/TriggerExampleTest.java   | 61 +++++++++++++-------
 1 file changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77aa0938/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index fe75d14..cddce7f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -34,6 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -44,7 +46,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Unit Tests for {@link TriggerExample}.
@@ -70,21 +74,27 @@ public class TriggerExampleTest {
           + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0"
           + ",,,,,0,,,,,0", new Instant(1)));
 
-  private static final TableRow OUT_ROW_1 = new TableRow()
-      .set("trigger_type", "default")
-      .set("freeway", "5").set("total_flow", 30)
-      .set("number_of_records", 1)
-      .set("isFirst", true).set("isLast", true)
-      .set("timing", "ON_TIME")
-      .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
-
-  private static final TableRow OUT_ROW_2 = new TableRow()
-      .set("trigger_type", "default")
-      .set("freeway", "110").set("total_flow", 90)
-      .set("number_of_records", 2)
-      .set("isFirst", true).set("isLast", true)
-      .set("timing", "ON_TIME")
-      .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
+  private static final TableRow OUT_ROW_1 =
+      new TableRow()
+          .set("trigger_type", "default")
+          .set("freeway", "5")
+          .set("total_flow", 30)
+          .set("number_of_records", 1)
+          .set("isFirst", true)
+          .set("isLast", true)
+          .set("timing", "ON_TIME")
+          .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
+
+  private static final TableRow OUT_ROW_2 =
+      new TableRow()
+          .set("trigger_type", "default")
+          .set("freeway", "110")
+          .set("total_flow", 90)
+          .set("number_of_records", 2)
+          .set("isFirst", true)
+          .set("isLast", true)
+          .set("timing", "ON_TIME")
+          .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
 
   @Test
   public void testExtractTotalFlow() throws Exception {
@@ -112,15 +122,26 @@ public class TriggerExampleTest {
         .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))))
         .apply(new TotalFlow("default"));
 
-    PCollection<TableRow> results =  totalFlow.apply(ParDo.of(new FormatResults()));
+    PCollection<String> results =  totalFlow.apply(ParDo.of(new FormatResults()));
 
-
-    PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2);
+    PAssert.that(results)
+        .containsInAnyOrder(canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2));
     pipeline.run();
 
   }
 
-  static class FormatResults extends DoFn<TableRow, TableRow> {
+  // Sort the fields and toString() the values, since TableRow has a bit of a dynamically
+  // typed API and equals()/hashCode() are not appropriate for matching in tests
+  static String canonicalFormat(TableRow row) {
+    List<String> entries = Lists.newArrayListWithCapacity(row.size());
+    for (Map.Entry<String, Object> entry : row.entrySet()) {
+      entries.add(entry.getKey() + ":" + entry.getValue());
+    }
+    Collections.sort(entries);
+    return Joiner.on(",").join(entries);
+  }
+
+  static class FormatResults extends DoFn<TableRow, String> {
     @Override
     public void processElement(ProcessContext c) throws Exception {
       TableRow element = c.element();
@@ -133,7 +154,7 @@ public class TriggerExampleTest {
           .set("isLast", element.get("isLast"))
           .set("timing", element.get("timing"))
           .set("window", element.get("window"));
-      c.output(row);
+      c.output(canonicalFormat(row));
     }
   }
 }