You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:29 UTC

[21/55] [abbrv] beam git commit: Improve queries tests

Improve queries tests

Fix Runner categories in tests

Add streaming unit tests and corresponding labels
issue #37

Update numEvents: results are no more linked to the number of events

issue #22


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7ef49dc3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7ef49dc3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7ef49dc3

Branch: refs/heads/master
Commit: 7ef49dc3706c3a2543284e17eb39782c783d30cf
Parents: 7c28b49
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Apr 3 16:50:51 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 .../nexmark/src/main/resources/log4j.properties |   2 +-
 .../integration/nexmark/queries/QueryTest.java  | 142 ++++++++++++++-----
 2 files changed, 110 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7ef49dc3/integration/java/nexmark/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties
index 30d0a9d..7dd57b5 100644
--- a/integration/java/nexmark/src/main/resources/log4j.properties
+++ b/integration/java/nexmark/src/main/resources/log4j.properties
@@ -27,7 +27,7 @@ log4j.logger.org.apache.beam.runners.direct=WARN
 log4j.logger.org.apache.beam.sdk=WARN
 
 # Nexmark specific
-log4j.logger.org.apache.beam.integration.nexmark=ALL
+log4j.logger.org.apache.beam.integration.nexmark=WARN
 
 # Settings to quiet third party logs that are too verbose
 log4j.logger.org.spark_project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/beam/blob/7ef49dc3/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index dca2887..284aa7e 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
@@ -35,81 +36,156 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Test the various NEXMark queries yield results coherent with their models.
- */
+/** Test the various NEXMark queries yield results coherent with their models. */
 @RunWith(JUnit4.class)
 public class QueryTest {
   private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone();
-  @Rule
-  public TestPipeline p = TestPipeline.create();
 
   static {
-    //careful, results of tests are linked to numEvents value
+    // careful, results of tests are linked to numEventGenerators because of timestamp generation
     CONFIG.numEventGenerators = 1;
-    CONFIG.numEvents = 100;
+    CONFIG.numEvents = 1000;
   }
 
+  @Rule public TestPipeline p = TestPipeline.create();
+
   /** Test {@code query} matches {@code model}. */
-  private void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) {
+  private void queryMatchesModel(
+      String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) {
     NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
-    PCollection<TimestampedValue<KnownSize>> results =
-        p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
-    //TODO Ismael this should not be called explicitly
-    results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+    PCollection<TimestampedValue<KnownSize>> results;
+    if (streamingMode) {
+      results =
+          p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
+      //TODO Ismael this should not be called explicitly
+      results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    } else {
+      results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
+      //TODO Ismael this should not be called explicitly
+      results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+    }
     PAssert.that(results).satisfies(model.assertionFor());
     PipelineResult result = p.run();
     result.waitUntilFinish();
   }
 
   @Test
-  public void query0MatchesModel() {
-    queryMatchesModel("Query0Test", new Query0(CONFIG), new Query0Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query0MatchesModelBatch() {
+    queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query0MatchesModelStreaming() {
+    queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query1MatchesModelBatch() {
+    queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query1MatchesModelStreaming() {
+    queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query2MatchesModelBatch() {
+    queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query2MatchesModelStreaming() {
+    queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true);
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
+  public void query3MatchesModelBatch() {
+    queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false);
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
+  public void query3MatchesModelStreaming() {
+    queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query4MatchesModelBatch() {
+    queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query4MatchesModelStreaming() {
+    queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void query5MatchesModelBatch() {
+    queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false);
   }
 
   @Test
-  public void query1MatchesModel() {
-    queryMatchesModel("Query1Test", new Query1(CONFIG), new Query1Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query5MatchesModelStreaming() {
+    queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true);
   }
 
   @Test
-  public void query2MatchesModel() {
-    queryMatchesModel("Query2Test", new Query2(CONFIG), new Query2Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query6MatchesModelBatch() {
+    queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false);
   }
 
   @Test
-  public void query3MatchesModel() {
-    queryMatchesModel("Query3Test", new Query3(CONFIG), new Query3Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query6MatchesModelStreaming() {
+    queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true);
   }
 
   @Test
-  public void query4MatchesModel() {
-    queryMatchesModel("Query4Test", new Query4(CONFIG), new Query4Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query7MatchesModelBatch() {
+    queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false);
   }
 
   @Test
-  public void query5MatchesModel() {
-    queryMatchesModel("Query5Test", new Query5(CONFIG), new Query5Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query7MatchesModelStreaming() {
+    queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true);
   }
 
   @Test
-  public void query6MatchesModel() {
-    queryMatchesModel("Query6Test", new Query6(CONFIG), new Query6Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query8MatchesModelBatch() {
+    queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false);
   }
 
   @Test
-  @Category({UsesStatefulParDo.class, UsesTimersInParDo.class})
-  public void query7MatchesModel() {
-    queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query8MatchesModelStreaming() {
+    queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true);
   }
 
   @Test
-  public void query8MatchesModel() {
-    queryMatchesModel("Query8Test", new Query8(CONFIG), new Query8Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query9MatchesModelBatch() {
+    queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false);
   }
 
   @Test
-  public void query9MatchesModel() {
-    queryMatchesModel("Query9Test", new Query9(CONFIG), new Query9Model(CONFIG));
+  @Category(NeedsRunner.class)
+  public void query9MatchesModelStreaming() {
+    queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true);
   }
 }