You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/10 23:08:19 UTC

[GitHub] [beam] robertwb commented on a change in pull request #12218: [BEAM-10444] Simpler PAssert for simpler cases.

robertwb commented on a change in pull request #12218:
URL: https://github.com/apache/beam/pull/12218#discussion_r453114999



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
##########
@@ -1171,6 +1174,41 @@ public GroupGlobally(AssertionWindows rewindowingStrategy) {
     public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
       final int combinedKey = 42;
 
+      if (input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())
+          && rewindowingStrategy instanceof IntoGlobalWindow) {
+        // If we don't have to worry about complicated triggering semantics we can generate
+        // a much simpler pipeline.  This is particularly useful for bootstrapping runners so that
+        // we can run subsets of the validates runner test suite requiring support of only the
+        // most basic primitives.
+
+        // In order to ensure we actually get an (empty) iterable rather than an empty PCollection
+        // when the input is an empty PCollection, we flatten with a dummy PCollection containing
+        // an empty iterable before grouping on a singleton key and concatenating.
+        PCollection<Iterable<ValueInSingleWindow<T>>> actual =
+            input.apply(Reify.windows()).apply(ParDo.of(new ToSingletonIterables<>()));
+        PCollection<Iterable<ValueInSingleWindow<T>>> dummy =
+            input
+                .getPipeline()
+                .apply(
+                    Create.<Iterable<ValueInSingleWindow<T>>>of(
+                            ImmutableList.of(ImmutableList.of()))
+                        .withCoder(actual.getCoder()));
+        return PCollectionList.of(dummy)
+            .and(actual)
+            .apply(Flatten.pCollections())
+            .apply(
+                // Default end-of-window trigger dissallowed for unbounded PCollections.

Review comment:
       Due to the if condition, the triggering (and windowing) is already the default, which is fine for bounded but not for unbounded. (I opted to leave it this way for bounded so that you don't have to know about the never.ever trigger to do bounded PAsserts.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org