You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:32 UTC

[24/55] [abbrv] beam git commit: Fix and improve query3 and query12

Fix and improve query3 and query12

query3: Use GlobalWindow to comply with the State/Timer APIs (issue #7). Use timer for personState expiration in GlobalWindow (issue #29). Add trigger to GlobalWindow

query12: Replace Count.perKey by Count.perElement (issue #34)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c28b492
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c28b492
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c28b492

Branch: refs/heads/master
Commit: 7c28b492aa17160d9a84914814e618716b7beb9f
Parents: bd93c8b
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Apr 3 15:18:04 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 .../nexmark/NexmarkConfiguration.java           |  19 +-
 .../integration/nexmark/NexmarkOptions.java     |   7 +
 .../integration/nexmark/queries/Query12.java    |  19 +-
 .../integration/nexmark/queries/Query3.java     | 263 +++++++++++--------
 .../integration/nexmark/queries/QueryTest.java  |   4 +
 5 files changed, 195 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index e2890ed..d6cd808 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -195,6 +195,13 @@ public class NexmarkConfiguration implements Serializable {
   public int fanout = 5;
 
   /**
+   * Maximum waiting time to clean personState in query3
+   * (ie maximum waiting of the auctions related to person in state in seconds in event time).
+   */
+  @JsonProperty
+  public int maxAuctionsWaitingTime = 600;
+
+  /**
    * Length of occasional delay to impose on events (in seconds).
    */
   @JsonProperty
@@ -322,6 +329,9 @@ public class NexmarkConfiguration implements Serializable {
     if (options.getFanout() != null) {
       fanout = options.getFanout();
     }
+    if (options.getMaxAuctionsWaitingTime() != null) {
+      fanout = options.getMaxAuctionsWaitingTime();
+    }
     if (options.getOccasionalDelaySec() != null) {
       occasionalDelaySec = options.getOccasionalDelaySec();
     }
@@ -376,6 +386,7 @@ public class NexmarkConfiguration implements Serializable {
     result.diskBusyBytes = diskBusyBytes;
     result.auctionSkip = auctionSkip;
     result.fanout = fanout;
+    result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
     result.occasionalDelaySec = occasionalDelaySec;
     result.probDelayedEvent = probDelayedEvent;
     result.maxLogEvents = maxLogEvents;
@@ -479,6 +490,9 @@ public class NexmarkConfiguration implements Serializable {
     if (fanout != DEFAULT.fanout) {
       sb.append(String.format("; fanout:%d", fanout));
     }
+    if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
+      sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
+    }
     if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
       sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
     }
@@ -527,7 +541,7 @@ public class NexmarkConfiguration implements Serializable {
         ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
         avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
         windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
-        coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout,
+        coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime,
         occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
         outOfOrderGroupSize);
   }
@@ -571,6 +585,9 @@ public class NexmarkConfiguration implements Serializable {
     if (fanout != other.fanout) {
       return false;
     }
+    if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
+      return false;
+    }
     if (firstEventRate != other.firstEventRate) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index 1be974f..e39f0a4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -309,6 +309,13 @@ public interface NexmarkOptions extends PubsubOptions {
 
   void setFanout(Integer fanout);
 
+  @Description("Maximum waiting time to clean personState in query3 "
+      + "(ie maximum waiting of the auctions related to person in state in seconds in event time).")
+  @Nullable
+  Integer getMaxAuctionsWaitingTime();
+
+  void setMaxAuctionsWaitingTime(Integer fanout);
+
   @Description("Length of occasional delay to impose on events (in seconds).")
   @Nullable
   Long getOccasionalDelaySec();

http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
index c67401b..a5db504 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
@@ -49,16 +49,13 @@ public class Query12 extends NexmarkQuery {
   private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
     return events
         .apply(JUST_BIDS)
-        .apply(name + ".Rekey",
-          // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
-            ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
-                   @ProcessElement
-                   public void processElement(ProcessContext c) {
-                     Bid bid = c.element();
-                     c.output(KV.of(bid.bidder, (Void) null));
-                   }
-                 }))
-        .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
+        .apply(ParDo.of(new DoFn<Bid, Long>() {
+          @ProcessElement
+          public void processElement(ProcessContext c){
+            c.output(c.element().bidder);
+          }
+        }))
+        .apply(Window.<Long>into(new GlobalWindows())
             .triggering(
                 Repeatedly.forever(
                     AfterProcessingTime.pastFirstElementInPane()
@@ -66,7 +63,7 @@ public class Query12 extends NexmarkQuery {
                                            Duration.standardSeconds(configuration.windowSizeSec))))
             .discardingFiredPanes()
             .withAllowedLateness(Duration.ZERO))
-        .apply(Count.<Long, Void>perKey())
+        .apply(Count.<Long>perElement())
         .apply(name + ".ToResult",
             ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
                    @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index 128c2b7..ba31e9f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -39,14 +39,21 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,31 +69,141 @@ import org.slf4j.LoggerFactory;
  * </pre>
  *
  * <p>We'll implement this query to allow 'new auction' events to come before the 'new person'
- * events for the auction seller. Those auctions will be stored until the matching person is
- * seen. Then all subsequent auctions for a person will use the stored person record.
+ * events for the auction seller. Those auctions will be stored until the matching person is seen.
+ * Then all subsequent auctions for a person will use the stored person record.
  *
  * <p>A real system would use an external system to maintain the id-to-person association.
  */
 public class Query3 extends NexmarkQuery {
+
   private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
-//  private static final StateContext GLOBAL_NAMESPACE = StateContexts.global();
-  private static final StateSpec<Object, ValueState<List<Auction>>> AUCTION_LIST_CODED_TAG =
-      StateSpecs.value(ListCoder.of(Auction.CODER));
-  private static final StateSpec<Object, ValueState<Person>> PERSON_CODED_TAG =
-      StateSpecs.value(Person.CODER);
+  private final JoinDoFn joinDoFn;
+
+  public Query3(NexmarkConfiguration configuration) {
+    super(configuration, "Query3");
+    joinDoFn = new JoinDoFn(configuration.maxAuctionsWaitingTime);
+
+  }
+
+  @Override
+  @Nullable
+  public Aggregator<Long, Long> getFatalCount() {
+    return joinDoFn.fatalCounter;
+  }
+
+  private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
+    int numEventsInPane = 30;
+
+    PCollection<Event> eventsWindowed =
+        events.apply(
+            Window.<Event>into(new GlobalWindows())
+                .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane))))
+                .discardingFiredPanes()
+                .withAllowedLateness(Duration.ZERO));
+    PCollection<KV<Long, Auction>> auctionsBySellerId =
+        eventsWindowed
+            // Only want the new auction events.
+            .apply(JUST_NEW_AUCTIONS)
+
+            // We only want auctions in category 10.
+            .apply(
+                name + ".InCategory",
+                Filter.by(
+                    new SerializableFunction<Auction, Boolean>() {
+
+                      @Override
+                      public Boolean apply(Auction auction) {
+                        return auction.category == 10;
+                      }
+                    }))
+
+            // Key auctions by their seller id.
+            .apply("AuctionBySeller", AUCTION_BY_SELLER);
+
+    PCollection<KV<Long, Person>> personsById =
+        eventsWindowed
+            // Only want the new people events.
+            .apply(JUST_NEW_PERSONS)
+
+            // We only want people in OR, ID, CA.
+            .apply(
+                name + ".InState",
+                Filter.by(
+                    new SerializableFunction<Person, Boolean>() {
+
+                      @Override
+                      public Boolean apply(Person person) {
+                        return person.state.equals("OR")
+                            || person.state.equals("ID")
+                            || person.state.equals("CA");
+                      }
+                    }))
+
+            // Key people by their id.
+            .apply("PersonById", PERSON_BY_ID);
+
+    return
+    // Join auctions and people.
+    // concatenate KeyedPCollections
+    KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
+        .and(PERSON_TAG, personsById)
+        // group auctions and persons by personId
+        .apply(CoGroupByKey.<Long>create())
+        .apply(name + ".Join", ParDo.of(joinDoFn))
+
+        // Project what we want.
+        .apply(
+            name + ".Project",
+            ParDo.of(
+                new DoFn<KV<Auction, Person>, NameCityStateId>() {
+
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Auction auction = c.element().getKey();
+                    Person person = c.element().getValue();
+                    c.output(
+                        new NameCityStateId(person.name, person.city, person.state, auction.id));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
 
   /**
-   * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair
-   * at a time.
+   * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at
+   * a time.
    *
    * <p>We know a person may submit any number of auctions. Thus new person event must have the
    * person record stored in persistent state in order to match future auctions by that person.
    *
-   * <p>However we know that each auction is associated with at most one person, so only need
-   * to store auction records in persistent state until we have seen the corresponding person
-   * record. And of course may have already seen that record.
+   * <p>However we know that each auction is associated with at most one person, so only need to
+   * store auction records in persistent state until we have seen the corresponding person record.
+   * And of course may have already seen that record.
    */
   private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
+
+    private int maxAuctionsWaitingTime;
+    private static final String AUCTIONS = "auctions";
+    private static final String PERSON = "person";
+
+    @StateId(PERSON)
+    private static final StateSpec<Object, ValueState<Person>> personSpec =
+        StateSpecs.value(Person.CODER);
+
+    private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
+
+    public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
+
+    @StateId(AUCTIONS)
+    private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec =
+        StateSpecs.value(ListCoder.of(Auction.CODER));
+
+    @TimerId(PERSON_STATE_EXPIRING)
+    private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
     private final Aggregator<Long, Long> newAuctionCounter =
         createAggregator("newAuction", Sum.ofLongs());
     private final Aggregator<Long, Long> newPersonCounter =
@@ -97,20 +214,25 @@ public class Query3 extends NexmarkQuery {
         createAggregator("newOldOutput", Sum.ofLongs());
     private final Aggregator<Long, Long> oldNewOutputCounter =
         createAggregator("oldNewOutput", Sum.ofLongs());
-    public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
+
+    private JoinDoFn(int maxAuctionsWaitingTime) {
+      this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+    }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws IOException {
-      //TODO: This is using the internal state API. Rework to use the
-      //TODO Ismael this is broken for not access to state
+    public void processElement(
+        ProcessContext c,
+        @TimerId(PERSON_STATE_EXPIRING) Timer timer,
+        @StateId(PERSON) ValueState<Person> personState,
+        @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState)
+        throws IOException {
       // We would *almost* implement this by  rewindowing into the global window and
       // running a combiner over the result. The combiner's accumulator would be the
       // state we use below. However, combiners cannot emit intermediate results, thus
       // we need to wait for the pending ReduceFn API.
-//      StateInternals<?> stateInternals = c.windowingInternals().stateInternals();
-//      ValueState<Person> personState = stateInternals.state(GLOBAL_NAMESPACE, PERSON_CODED_TAG);
-//      Person existingPerson = personState.read();
-      Person existingPerson = null;
+
+      Person existingPerson = personState.read();
+
       if (existingPerson != null) {
         // We've already seen the new person event for this person id.
         // We can join with any new auctions on-the-fly without needing any
@@ -123,8 +245,6 @@ public class Query3 extends NexmarkQuery {
         return;
       }
 
-//      ValueState<List<Auction>> auctionsState =
-//          stateInternals.state(GLOBAL_NAMESPACE, AUCTION_LIST_CODED_TAG);
       Person theNewPerson = null;
       for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
         if (theNewPerson == null) {
@@ -140,14 +260,14 @@ public class Query3 extends NexmarkQuery {
         }
         newPersonCounter.addValue(1L);
         // We've now seen the person for this person id so can flush any
-        // pending auctions for the same seller id.
-        List<Auction> pendingAuctions = null; //auctionsState.read();
+        // pending auctions for the same seller id (an auction is done by only one seller).
+        List<Auction> pendingAuctions = auctionsState.read();
         if (pendingAuctions != null) {
           for (Auction pendingAuction : pendingAuctions) {
             oldNewOutputCounter.addValue(1L);
             c.output(KV.of(pendingAuction, newPerson));
           }
-//          auctionsState.clear();
+          auctionsState.clear();
         }
         // Also deal with any new auctions.
         for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
@@ -156,8 +276,11 @@ public class Query3 extends NexmarkQuery {
           c.output(KV.of(newAuction, newPerson));
         }
         // Remember this person for any future auctions.
-
-//        personState.write(newPerson);
+        personState.write(newPerson);
+        //set a time out to clear this state
+        Instant firingTime = new Instant(newPerson.dateTime)
+                                  .plus(Duration.standardSeconds(maxAuctionsWaitingTime));
+        timer.set(firingTime);
       }
       if (theNewPerson != null) {
         return;
@@ -165,7 +288,7 @@ public class Query3 extends NexmarkQuery {
 
       // We'll need to remember the auctions until we see the corresponding
       // new person event.
-      List<Auction> pendingAuctions = null; //auctionsState.read();
+      List<Auction> pendingAuctions = auctionsState.read();
       if (pendingAuctions == null) {
         pendingAuctions = new ArrayList<>();
       }
@@ -173,84 +296,14 @@ public class Query3 extends NexmarkQuery {
         newAuctionCounter.addValue(1L);
         pendingAuctions.add(newAuction);
       }
-//      auctionsState.write(pendingAuctions);
+      auctionsState.write(pendingAuctions);
     }
+  @OnTimer(PERSON_STATE_EXPIRING)
+  public void onTimerCallback(
+      OnTimerContext context,
+      @StateId(PERSON) ValueState<Person> personState) {
+      personState.clear();
   }
 
-  private final JoinDoFn joinDoFn = new JoinDoFn();
-
-  public Query3(NexmarkConfiguration configuration) {
-    super(configuration, "Query3");
-  }
-
-  @Override
-  @Nullable
-  public Aggregator<Long, Long> getFatalCount() {
-    return joinDoFn.fatalCounter;
-  }
-
-  private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
-    // Batch into incremental results windows.
-    events = events.apply(
-        Window.<Event>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
-
-    PCollection<KV<Long, Auction>> auctionsBySellerId =
-        events
-            // Only want the new auction events.
-            .apply(JUST_NEW_AUCTIONS)
-
-            // We only want auctions in category 10.
-            .apply(name + ".InCategory", Filter.by(new SerializableFunction<Auction, Boolean>() {
-              @Override
-              public Boolean apply(Auction auction) {
-                return auction.category == 10;
-              }
-            }))
-
-            // Key auctions by their seller id.
-            .apply("AuctionBySeller", AUCTION_BY_SELLER);
-
-    PCollection<KV<Long, Person>> personsById =
-        events
-            // Only want the new people events.
-            .apply(JUST_NEW_PERSONS)
-
-            // We only want people in OR, ID, CA.
-            .apply(name + ".InState", Filter.by(new SerializableFunction<Person, Boolean>() {
-              @Override
-              public Boolean apply(Person person) {
-                return person.state.equals("OR") || person.state.equals("ID")
-                    || person.state.equals("CA");
-              }
-            }))
-
-            // Key people by their id.
-            .apply("PersonById", PERSON_BY_ID);
-
-    return
-      // Join auctions and people.
-        // concatenate KeyedPCollections
-      KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
-            .and(PERSON_TAG, personsById)
-        // group auctions and persons by personId
-        .apply(CoGroupByKey.<Long>create())
-            .apply(name + ".Join", ParDo.of(joinDoFn))
-
-            // Project what we want.
-            .apply(name + ".Project",
-                ParDo.of(new DoFn<KV<Auction, Person>, NameCityStateId>() {
-                      @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        Auction auction = c.element().getKey();
-                        Person person = c.element().getValue();
-                        c.output(new NameCityStateId(
-                            person.name, person.city, person.state, auction.id));
-                      }
-                    }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index 5cf4287..dca2887 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -25,10 +25,13 @@ import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -95,6 +98,7 @@ public class QueryTest {
   }
 
   @Test
+  @Category({UsesStatefulParDo.class, UsesTimersInParDo.class})
   public void query7MatchesModel() {
     queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG));
   }