You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:29 UTC
[21/55] [abbrv] beam git commit: Improve queries tests
Improve queries tests
Fix Runner categories in tests
Add streaming unit tests and corresponding labels
issue #37
Update numEvents: results are no more linked to the number of events
issue #22
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7ef49dc3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7ef49dc3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7ef49dc3
Branch: refs/heads/master
Commit: 7ef49dc3706c3a2543284e17eb39782c783d30cf
Parents: 7c28b49
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Apr 3 16:50:51 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
.../nexmark/src/main/resources/log4j.properties | 2 +-
.../integration/nexmark/queries/QueryTest.java | 142 ++++++++++++++-----
2 files changed, 110 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7ef49dc3/integration/java/nexmark/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties
index 30d0a9d..7dd57b5 100644
--- a/integration/java/nexmark/src/main/resources/log4j.properties
+++ b/integration/java/nexmark/src/main/resources/log4j.properties
@@ -27,7 +27,7 @@ log4j.logger.org.apache.beam.runners.direct=WARN
log4j.logger.org.apache.beam.sdk=WARN
# Nexmark specific
-log4j.logger.org.apache.beam.integration.nexmark=ALL
+log4j.logger.org.apache.beam.integration.nexmark=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
http://git-wip-us.apache.org/repos/asf/beam/blob/7ef49dc3/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index dca2887..284aa7e 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
@@ -35,81 +36,156 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Test the various NEXMark queries yield results coherent with their models.
- */
+/** Test the various NEXMark queries yield results coherent with their models. */
@RunWith(JUnit4.class)
public class QueryTest {
private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone();
- @Rule
- public TestPipeline p = TestPipeline.create();
static {
- //careful, results of tests are linked to numEvents value
+ // careful, results of tests are linked to numEventGenerators because of timestamp generation
CONFIG.numEventGenerators = 1;
- CONFIG.numEvents = 100;
+ CONFIG.numEvents = 1000;
}
+ @Rule public TestPipeline p = TestPipeline.create();
+
/** Test {@code query} matches {@code model}. */
- private void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) {
+ private void queryMatchesModel(
+ String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) {
NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
- PCollection<TimestampedValue<KnownSize>> results =
- p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
- //TODO Ismael this should not be called explicitly
- results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+ PCollection<TimestampedValue<KnownSize>> results;
+ if (streamingMode) {
+ results =
+ p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
+ //TODO Ismael this should not be called explicitly
+ results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+ } else {
+ results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
+ //TODO Ismael this should not be called explicitly
+ results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+ }
PAssert.that(results).satisfies(model.assertionFor());
PipelineResult result = p.run();
result.waitUntilFinish();
}
@Test
- public void query0MatchesModel() {
- queryMatchesModel("Query0Test", new Query0(CONFIG), new Query0Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query0MatchesModelBatch() {
+ queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query0MatchesModelStreaming() {
+ queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query1MatchesModelBatch() {
+ queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query1MatchesModelStreaming() {
+ queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query2MatchesModelBatch() {
+ queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query2MatchesModelStreaming() {
+ queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true);
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
+ public void query3MatchesModelBatch() {
+ queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false);
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
+ public void query3MatchesModelStreaming() {
+ queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query4MatchesModelBatch() {
+ queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query4MatchesModelStreaming() {
+ queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query5MatchesModelBatch() {
+ queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false);
}
@Test
- public void query1MatchesModel() {
- queryMatchesModel("Query1Test", new Query1(CONFIG), new Query1Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query5MatchesModelStreaming() {
+ queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true);
}
@Test
- public void query2MatchesModel() {
- queryMatchesModel("Query2Test", new Query2(CONFIG), new Query2Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query6MatchesModelBatch() {
+ queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false);
}
@Test
- public void query3MatchesModel() {
- queryMatchesModel("Query3Test", new Query3(CONFIG), new Query3Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query6MatchesModelStreaming() {
+ queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true);
}
@Test
- public void query4MatchesModel() {
- queryMatchesModel("Query4Test", new Query4(CONFIG), new Query4Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query7MatchesModelBatch() {
+ queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false);
}
@Test
- public void query5MatchesModel() {
- queryMatchesModel("Query5Test", new Query5(CONFIG), new Query5Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query7MatchesModelStreaming() {
+ queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true);
}
@Test
- public void query6MatchesModel() {
- queryMatchesModel("Query6Test", new Query6(CONFIG), new Query6Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query8MatchesModelBatch() {
+ queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false);
}
@Test
- @Category({UsesStatefulParDo.class, UsesTimersInParDo.class})
- public void query7MatchesModel() {
- queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query8MatchesModelStreaming() {
+ queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true);
}
@Test
- public void query8MatchesModel() {
- queryMatchesModel("Query8Test", new Query8(CONFIG), new Query8Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query9MatchesModelBatch() {
+ queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false);
}
@Test
- public void query9MatchesModel() {
- queryMatchesModel("Query9Test", new Query9(CONFIG), new Query9Model(CONFIG));
+ @Category(NeedsRunner.class)
+ public void query9MatchesModelStreaming() {
+ queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true);
}
}