You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:10 UTC

[02/55] [abbrv] beam git commit: NexMark

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2.java
new file mode 100644
index 0000000..cede2f3
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(auction, price)
+ * FROM Bid [NOW]
+ * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
+ * </pre>
+ *
+ * <p>As written that query will only yield a few hundred results over event streams of
+ * arbitrary size. To make it more interesting we instead choose bids for every
+ * {@code auctionSkip}'th auction.
+ */
+class Query2 extends NexmarkQuery {
+  public Query2(NexmarkConfiguration configuration) {
+    super(configuration, "Query2");
+  }
+
+  private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+
+        // Select just the bids for the auctions we care about.
+        .apply(Filter.byPredicate(new SerializableFunction<Bid, Boolean>() {
+          @Override
+          public Boolean apply(Bid bid) {
+            return bid.auction % configuration.auctionSkip == 0;
+          }
+        }))
+
+        // Project just auction id and price.
+        .apply(
+            ParDo.named(name + ".Project")
+                .of(new DoFn<Bid, AuctionPrice>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(new AuctionPrice(bid.auction, bid.price));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
new file mode 100644
index 0000000..6ccfeeb
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A direct implementation of {@link Query2}.
+ */
+public class Query2Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 2.
+   */
+  private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non bid events.
+        return;
+      }
+      Bid bid = event.bid;
+      if (bid.auction % configuration.auctionSkip != 0) {
+        // Ignore bids for auctions we don't care about.
+        return;
+      }
+      AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
+      TimestampedValue<AuctionPrice> result =
+          TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public Query2Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3.java
new file mode 100644
index 0000000..5b9b17b
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Query 3, 'Local Item Suggestion'. Who is selling in OR, ID or CA in category 10, and for what
+ * auction ids? In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(P.name, P.city, P.state, A.id)
+ * FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED]
+ * WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category
+ * = 10;
+ * </pre>
+ *
+ * <p>We'll implement this query to allow 'new auction' events to come before the 'new person'
+ * events for the auction seller. Those auctions will be stored until the matching person is
+ * seen. Then all subsequent auctions for a person will use the stored person record.
+ *
+ * <p>A real system would use an external system to maintain the id-to-person association.
+ */
+class Query3 extends NexmarkQuery {
+  private static final StateNamespace GLOBAL_NAMESPACE = StateNamespaces.global();
+  private static final StateTag<Object, ValueState<List<Auction>>> AUCTION_LIST_CODED_TAG =
+      StateTags.value("left", ListCoder.of(Auction.CODER));
+  private static final StateTag<Object, ValueState<Person>> PERSON_CODED_TAG =
+      StateTags.value("right", Person.CODER);
+
+  /**
+   * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair
+   * at a time.
+   *
+   * <p>We know a person may submit any number of auctions. Thus new person event must have the
+   * person record stored in persistent state in order to match future auctions by that person.
+   *
+   * <p>However we know that each auction is associated with at most one person, so only need
+   * to store auction records in persistent state until we have seen the corresponding person
+   * record. And of course may have already seen that record.
+   */
+  private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
+    private final Aggregator<Long, Long> newAuctionCounter =
+        createAggregator("newAuction", new SumLongFn());
+    private final Aggregator<Long, Long> newPersonCounter =
+        createAggregator("newPerson", new SumLongFn());
+    private final Aggregator<Long, Long> newNewOutputCounter =
+        createAggregator("newNewOutput", new SumLongFn());
+    private final Aggregator<Long, Long> newOldOutputCounter =
+        createAggregator("newOldOutput", new SumLongFn());
+    private final Aggregator<Long, Long> oldNewOutputCounter =
+        createAggregator("oldNewOutput", new SumLongFn());
+    public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", new SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) throws IOException {
+      // TODO: This is using the internal state API. Rework to use the
+      // We would *almost* implement this by  rewindowing into the global window and
+      // running a combiner over the result. The combiner's accumulator would be the
+      // state we use below. However, combiners cannot emit intermediate results, thus
+      // we need to wait for the pending ReduceFn API.
+      StateInternals<?> stateInternals = c.windowingInternals().stateInternals();
+      ValueState<Person> personState = stateInternals.state(GLOBAL_NAMESPACE, PERSON_CODED_TAG);
+      Person existingPerson = personState.read();
+      if (existingPerson != null) {
+        // We've already seen the new person event for this person id.
+        // We can join with any new auctions on-the-fly without needing any
+        // additional persistent state.
+        for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+          newAuctionCounter.addValue(1L);
+          newOldOutputCounter.addValue(1L);
+          c.output(KV.of(newAuction, existingPerson));
+        }
+        return;
+      }
+
+      ValueState<List<Auction>> auctionsState =
+          stateInternals.state(GLOBAL_NAMESPACE, AUCTION_LIST_CODED_TAG);
+      Person theNewPerson = null;
+      for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
+        if (theNewPerson == null) {
+          theNewPerson = newPerson;
+        } else {
+          if (theNewPerson.equals(newPerson)) {
+            NexmarkUtils.error("**** duplicate person %s ****", theNewPerson);
+          } else {
+            NexmarkUtils.error("**** conflicting persons %s and %s ****", theNewPerson, newPerson);
+          }
+          fatalCounter.addValue(1L);
+          continue;
+        }
+        newPersonCounter.addValue(1L);
+        // We've now seen the person for this person id so can flush any
+        // pending auctions for the same seller id.
+        List<Auction> pendingAuctions = auctionsState.read();
+        if (pendingAuctions != null) {
+          for (Auction pendingAuction : pendingAuctions) {
+            oldNewOutputCounter.addValue(1L);
+            c.output(KV.of(pendingAuction, newPerson));
+          }
+          auctionsState.clear();
+        }
+        // Also deal with any new auctions.
+        for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+          newAuctionCounter.addValue(1L);
+          newNewOutputCounter.addValue(1L);
+          c.output(KV.of(newAuction, newPerson));
+        }
+        // Remember this person for any future auctions.
+        personState.write(newPerson);
+      }
+      if (theNewPerson != null) {
+        return;
+      }
+
+      // We'll need to remember the auctions until we see the corresponding
+      // new person event.
+      List<Auction> pendingAuctions = auctionsState.read();
+      if (pendingAuctions == null) {
+        pendingAuctions = new ArrayList<>();
+      }
+      for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+        newAuctionCounter.addValue(1L);
+        pendingAuctions.add(newAuction);
+      }
+      auctionsState.write(pendingAuctions);
+    }
+  }
+
+  private final JoinDoFn joinDoFn = new JoinDoFn();
+
+  public Query3(NexmarkConfiguration configuration) {
+    super(configuration, "Query3");
+  }
+
+  @Override
+  @Nullable
+  public Aggregator<Long, Long> getFatalCount() {
+    return joinDoFn.fatalCounter;
+  }
+
+  private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
+    // Batch into incremental results windows.
+    events = events.apply(
+        Window.<Event>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
+
+    PCollection<KV<Long, Auction>> auctionsBySellerId =
+        events
+            // Only want the new auction events.
+            .apply(JUST_NEW_AUCTIONS)
+
+            // We only want auctions in category 10.
+            .apply(Filter.byPredicate(new SerializableFunction<Auction, Boolean>() {
+              @Override
+              public Boolean apply(Auction auction) {
+                return auction.category == 10;
+              }
+            }).named(name + ".InCategory"))
+
+            // Key auctions by their seller id.
+            .apply(AUCTION_BY_SELLER);
+
+    PCollection<KV<Long, Person>> personsById =
+        events
+            // Only want the new people events.
+            .apply(JUST_NEW_PERSONS)
+
+            // We only want people in OR, ID, CA.
+            .apply(Filter.byPredicate(new SerializableFunction<Person, Boolean>() {
+              @Override
+              public Boolean apply(Person person) {
+                return person.state.equals("OR") || person.state.equals("ID")
+                    || person.state.equals("CA");
+              }
+            }).named(name + ".InState"))
+
+            // Key people by their id.
+            .apply(PERSON_BY_ID);
+
+    return
+        // Join auctions and people.
+        KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
+            .and(PERSON_TAG, personsById)
+            .apply(CoGroupByKey.<Long>create())
+            .apply(ParDo.named(name + ".Join").of(joinDoFn))
+
+            // Project what we want.
+            .apply(
+                ParDo.named(name + ".Project")
+                    .of(new DoFn<KV<Auction, Person>, NameCityStateId>() {
+                      @Override
+                      public void processElement(ProcessContext c) {
+                        Auction auction = c.element().getKey();
+                        Person person = c.element().getValue();
+                        c.output(new NameCityStateId(
+                            person.name, person.city, person.state, auction.id));
+                      }
+                    }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java
new file mode 100644
index 0000000..b865eda
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A direct implementation of {@link Query3}.
+ */
+public class Query3Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 3.
+   */
+  private class Simulator extends AbstractSimulator<Event, NameCityStateId> {
+    /** Auctions, indexed by seller id. */
+    private final Multimap<Long, Auction> newAuctions;
+
+    /** Persons, indexed by id. */
+    private final Map<Long, Person> newPersons;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      newPersons = new HashMap<>();
+      newAuctions = ArrayListMultimap.create();
+    }
+
+    /**
+     * Capture new result.
+     */
+    private void addResult(Auction auction, Person person, Instant timestamp) {
+      TimestampedValue<NameCityStateId> result = TimestampedValue.of(
+          new NameCityStateId(person.name, person.city, person.state, auction.id), timestamp);
+      addResult(result);
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid != null) {
+        // Ignore bid events.
+        return;
+      }
+
+      Instant timestamp = timestampedEvent.getTimestamp();
+
+      if (event.newAuction != null) {
+        // Only want auctions in category 10.
+        if (event.newAuction.category == 10) {
+          // Join new auction with existing person, if any.
+          Person person = newPersons.get(event.newAuction.seller);
+          if (person != null) {
+            addResult(event.newAuction, person, timestamp);
+          } else {
+            // Remember auction for future new person event.
+            newAuctions.put(event.newAuction.seller, event.newAuction);
+          }
+        }
+      } else {
+        // Only want people in OR, ID or CA.
+        if (event.newPerson.state.equals("OR") || event.newPerson.state.equals("ID")
+            || event.newPerson.state.equals("CA")) {
+          // Join new person with existing auctions.
+          for (Auction auction : newAuctions.get(event.newPerson.id)) {
+            addResult(auction, event.newPerson, timestamp);
+          }
+          // We'll never need these auctions again.
+          newAuctions.removeAll(event.newPerson.id);
+          // Remember person for future auctions.
+          newPersons.put(event.newPerson.id, event.newPerson);
+        }
+      }
+    }
+  }
+
+  public Query3Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4.java
new file mode 100644
index 0000000..bc695b7
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+
+/**
+ * Query 4, 'Average Price for a Category'. Select the average of the wining bid prices for all
+ * closed auctions in each category. In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(AVG(Q.final))
+ * FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category)
+ *                   FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ *                   WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ *                   GROUP BY A.id, A.category) Q
+ * WHERE Q.category = C.id
+ * GROUP BY C.id;
+ * </pre>
+ *
+ * <p>For extra spiciness our implementation differs slightly from the above:
+ * <ul>
+ * <li>We select both the average winning price and the category.
+ * <li>We don't bother joining with a static category table, since it's contents are never used.
+ * <li>We only consider bids which are above the auction's reserve price.
+ * <li>We accept the highest-price, earliest valid bid as the winner.
+ * <li>We calculate the averages oven a sliding window of size {@code windowSizeSec} and
+ * period {@code windowPeriodSec}.
+ * </ul>
+ */
+class Query4 extends NexmarkQuery {
+  private final Monitor<AuctionBid> winningBidsMonitor;
+
+  public Query4(NexmarkConfiguration configuration) {
+    super(configuration, "Query4");
+    winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning");
+  }
+
+  private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) {
+    PCollection<AuctionBid> winningBids =
+        events
+            // Find the winning bid for each closed auction.
+            .apply(new WinningBids(name + ".WinningBids", configuration));
+
+    // Monitor winning bids
+    winningBids = winningBids.apply(winningBidsMonitor.getTransform());
+
+    return winningBids
+        // Key the winning bid price by the auction category.
+        .apply(
+            ParDo.named(name + ".Rekey")
+                .of(new DoFn<AuctionBid, KV<Long, Long>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Auction auction = c.element().auction;
+                    Bid bid = c.element().bid;
+                    c.output(KV.of(auction.category, bid.price));
+                  }
+                }))
+
+        // Re-window so we can calculate a sliding average
+        .apply(Window.<KV<Long, Long>>into(
+            SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
+                .every(Duration.standardSeconds(configuration.windowPeriodSec))))
+
+        // Find the average of the winning bids for each category.
+        // Make sure we share the work for each category between workers.
+        .apply(Mean.<Long, Long>perKey().withHotKeyFanout(configuration.fanout))
+
+        // For testing against Query4Model, capture which results are 'final'.
+        .apply(
+            ParDo.named(name + ".Project")
+                .of(new DoFn<KV<Long, Double>, CategoryPrice>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(new CategoryPrice(c.element().getKey(),
+                        Math.round(c.element().getValue()), c.pane().isLast()));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java
new file mode 100644
index 0000000..2410306
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A direct implementation of {@link Query4}.
+ */
+public class Query4Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 4.
+   */
+  private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> {
+    /** The prices and categories for all winning bids in the last window size. */
+    private final List<TimestampedValue<CategoryPrice>> winningPricesByCategory;
+
+    /** Timestamp of last result (ms since epoch). */
+    private Instant lastTimestamp;
+
+    /** When oldest active window starts. */
+    private Instant windowStart;
+
+    /** The last seen result for each category. */
+    private final Map<Long, TimestampedValue<CategoryPrice>> lastSeenResults;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(new WinningBidsSimulator(configuration).results());
+      winningPricesByCategory = new ArrayList<>();
+      lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+      lastSeenResults = new TreeMap<>();
+    }
+
+    /**
+     * Calculate the average bid price for each category for all winning bids
+     * which are strictly before {@code end}.
+     */
+    private void averages(Instant end) {
+      Map<Long, Long> counts = new TreeMap<>();
+      Map<Long, Long> totals = new TreeMap<>();
+      for (TimestampedValue<CategoryPrice> value : winningPricesByCategory) {
+        if (!value.getTimestamp().isBefore(end)) {
+          continue;
+        }
+        long category = value.getValue().category;
+        long price = value.getValue().price;
+        Long count = counts.get(category);
+        if (count == null) {
+          count = 1L;
+        } else {
+          count += 1;
+        }
+        counts.put(category, count);
+        Long total = totals.get(category);
+        if (total == null) {
+          total = price;
+        } else {
+          total += price;
+        }
+        totals.put(category, total);
+      }
+      for (long category : counts.keySet()) {
+        long count = counts.get(category);
+        long total = totals.get(category);
+        TimestampedValue<CategoryPrice> result = TimestampedValue.of(
+            new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp);
+        addIntermediateResult(result);
+        lastSeenResults.put(category, result);
+      }
+    }
+
+    /**
+     * Calculate averages for any windows which can now be retired. Also prune entries
+     * which can no longer contribute to any future window.
+     */
+    private void prune(Instant newWindowStart) {
+      while (!newWindowStart.equals(windowStart)) {
+        averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
+        windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
+        Iterator<TimestampedValue<CategoryPrice>> itr = winningPricesByCategory.iterator();
+        while (itr.hasNext()) {
+          if (itr.next().getTimestamp().isBefore(windowStart)) {
+            itr.remove();
+          }
+        }
+        if (winningPricesByCategory.isEmpty()) {
+          windowStart = newWindowStart;
+        }
+      }
+    }
+
+    /**
+     * Capture the winning bid.
+     */
+    private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
+      winningPricesByCategory.add(
+          TimestampedValue.of(new CategoryPrice(auction.category, bid.price, false), timestamp));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
+      if (timestampedWinningBid == null) {
+        prune(NexmarkUtils.END_OF_TIME);
+        for (TimestampedValue<CategoryPrice> result : lastSeenResults.values()) {
+          addResult(result);
+        }
+        allDone();
+        return;
+      }
+      lastTimestamp = timestampedWinningBid.getTimestamp();
+      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+          Duration.standardSeconds(configuration.windowPeriodSec), lastTimestamp);
+      prune(newWindowStart);
+      captureWinningBid(timestampedWinningBid.getValue().auction,
+          timestampedWinningBid.getValue().bid, lastTimestamp);
+    }
+  }
+
+  public Query4Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+      Iterable<TimestampedValue<KnownSize>> results) {
+    // Find the last (in processing time) reported average price for each category.
+    Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
+    for (TimestampedValue<KnownSize> obj : results) {
+      Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof CategoryPrice);
+      CategoryPrice categoryPrice = (CategoryPrice) obj.getValue();
+      if (categoryPrice.isLast) {
+        finalAverages.put(
+            categoryPrice.category,
+            TimestampedValue.of((KnownSize) categoryPrice, obj.getTimestamp()));
+      }
+    }
+
+    return finalAverages.values();
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5.java
new file mode 100644
index 0000000..91a4a28
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every
+ * minute). In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(auction)
+ * FROM (SELECT B1.auction, count(*) AS num
+ *       FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
+ *       GROUP BY B1.auction)
+ * WHERE num >= ALL (SELECT count(*)
+ *                   FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
+ *                   GROUP BY B2.auction);
+ * </pre>
+ *
+ * <p>To make things a bit more dynamic and easier to test we use much shorter windows, and
+ * we'll also preserve the bid counts.
+ */
+class Query5 extends NexmarkQuery {
+  public Query5(NexmarkConfiguration configuration) {
+    super(configuration, "Query5");
+  }
+
+  private PCollection<AuctionCount> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+        // Window the bids into sliding windows.
+        .apply(Window.<Bid>into(
+            SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
+                .every(Duration.standardSeconds(configuration.windowPeriodSec))))
+        // Project just the auction id.
+        .apply(BID_TO_AUCTION)
+
+        // Count the number of bids per auction id.
+        .apply(Count.<Long>perElement())
+
+        // We'll want to keep all auctions with the maximal number of bids.
+        // Start by lifting each into a singleton list.
+        .apply(
+            ParDo.named(name + ".ToSingletons")
+                .of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.of(Arrays.asList(c.element().getKey()), c.element().getValue()));
+                  }
+                }))
+
+        // Keep only the auction ids with the most bids.
+        .apply(
+            Combine
+                .globally(new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
+                  @Override
+                  public KV<List<Long>, Long> apply(
+                      KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
+                    List<Long> leftBestAuctions = left.getKey();
+                    long leftCount = left.getValue();
+                    List<Long> rightBestAuctions = right.getKey();
+                    long rightCount = right.getValue();
+                    if (leftCount > rightCount) {
+                      return left;
+                    } else if (leftCount < rightCount) {
+                      return right;
+                    } else {
+                      List<Long> newBestAuctions = new ArrayList<>();
+                      newBestAuctions.addAll(leftBestAuctions);
+                      newBestAuctions.addAll(rightBestAuctions);
+                      return KV.of(newBestAuctions, leftCount);
+                    }
+                  }
+                })
+                .withoutDefaults()
+                .withFanout(configuration.fanout))
+
+        // Project into result.
+        .apply(
+            ParDo.named(name + ".Select")
+                .of(new DoFn<KV<List<Long>, Long>, AuctionCount>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    long count = c.element().getValue();
+                    for (long auction : c.element().getKey()) {
+                      c.output(new AuctionCount(auction, count));
+                    }
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java
new file mode 100644
index 0000000..a7dd8f0
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A direct implementation of {@link Query5}.
+ */
+public class Query5Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 5.
+   */
+  private class Simulator extends AbstractSimulator<Event, AuctionCount> {
+    /** Time of bids still contributing to open windows, indexed by their auction id. */
+    private final Map<Long, List<Instant>> bids;
+
+    /** When oldest active window starts. */
+    private Instant windowStart;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      bids = new TreeMap<>();
+      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+    }
+
+    /**
+     * Count bids per auction id for bids strictly before {@code end}. Add the auction ids with
+     * the maximum number of bids to results.
+     */
+    private void countBids(Instant end) {
+      Map<Long, Long> counts = new TreeMap<>();
+      long maxCount = 0L;
+      for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
+        long count = 0L;
+        long auction = entry.getKey();
+        for (Instant bid : entry.getValue()) {
+          if (bid.isBefore(end)) {
+            count++;
+          }
+        }
+        if (count > 0) {
+          counts.put(auction, count);
+          maxCount = Math.max(maxCount, count);
+        }
+      }
+      for (Map.Entry<Long, Long> entry : counts.entrySet()) {
+        long auction = entry.getKey();
+        long count = entry.getValue();
+        if (count == maxCount) {
+          AuctionCount result = new AuctionCount(auction, count);
+          addResult(TimestampedValue.of(result, end));
+        }
+      }
+    }
+
+    /**
+     * Retire bids which are strictly before {@code cutoff}. Return true if there are any bids
+     * remaining.
+     */
+    private boolean retireBids(Instant cutoff) {
+      boolean anyRemain = false;
+      for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
+        long auction = entry.getKey();
+        Iterator<Instant> itr = entry.getValue().iterator();
+        while (itr.hasNext()) {
+          Instant bid = itr.next();
+          if (bid.isBefore(cutoff)) {
+            NexmarkUtils.info("retire: %s for %s", bid, auction);
+            itr.remove();
+          } else {
+            anyRemain = true;
+          }
+        }
+      }
+      return anyRemain;
+    }
+
+    /**
+     * Retire active windows until we've reached {@code newWindowStart}.
+     */
+    private void retireWindows(Instant newWindowStart) {
+      while (!newWindowStart.equals(windowStart)) {
+        NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart);
+        // Count bids in the window (windowStart, windowStart + size].
+        countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
+        // Advance the window.
+        windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
+        // Retire bids which will never contribute to a future window.
+        if (!retireBids(windowStart)) {
+          // Can fast forward to latest window since no more outstanding bids.
+          windowStart = newWindowStart;
+        }
+      }
+    }
+
+    /**
+     * Add bid to state.
+     */
+    private void captureBid(Bid bid, Instant timestamp) {
+      List<Instant> existing = bids.get(bid.auction);
+      if (existing == null) {
+        existing = new ArrayList<>();
+        bids.put(bid.auction, existing);
+      }
+      existing.add(timestamp);
+    }
+
+    @Override
+    public void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        // Drain the remaining windows.
+        retireWindows(NexmarkUtils.END_OF_TIME);
+        allDone();
+        return;
+      }
+
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+      Instant timestamp = timestampedEvent.getTimestamp();
+      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+          Duration.standardSeconds(configuration.windowPeriodSec), timestamp);
+      // Capture results from any windows we can now retire.
+      retireWindows(newWindowStart);
+      // Capture current bid.
+      captureBid(event.bid, timestamp);
+    }
+  }
+
+  public Query5Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6.java
new file mode 100644
index 0000000..49c0d68
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import com.google.common.collect.Lists;
+
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the
+ * last 10 closed auctions by the same seller. In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(AVG(Q.final), Q.seller)
+ * FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
+ *       FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ *       WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ *       GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
+ * GROUP BY Q.seller;
+ * </pre>
+ *
+ * <p>We are a little more exact with selecting winning bids: see {@link WinningBids}.
+ */
+class Query6 extends NexmarkQuery {
+  /**
+   * Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate
+   * their average selling price.
+   */
+  private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> {
+    private final int maxNumBids;
+
+    public MovingMeanSellingPrice(int maxNumBids) {
+      this.maxNumBids = maxNumBids;
+    }
+
+    @Override
+    public List<Bid> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<Bid> addInput(List<Bid> accumulator, Bid input) {
+      accumulator.add(input);
+      Collections.sort(accumulator, Bid.ASCENDING_TIME_THEN_PRICE);
+      if (accumulator.size() > maxNumBids) {
+        accumulator.remove(0);
+      }
+      return accumulator;
+    }
+
+    @Override
+    public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
+      List<Bid> result = new ArrayList<>();
+      for (List<Bid> accumulator : accumulators) {
+        for (Bid bid : accumulator) {
+          result.add(bid);
+        }
+      }
+      Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE);
+      if (result.size() > maxNumBids) {
+        result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids));
+      }
+      return result;
+    }
+
+    @Override
+    public Long extractOutput(List<Bid> accumulator) {
+      if (accumulator.isEmpty()) {
+        return 0L;
+      }
+      long sumOfPrice = 0;
+      for (Bid bid : accumulator) {
+        sumOfPrice += bid.price;
+      }
+      return Math.round((double) sumOfPrice / accumulator.size());
+    }
+  }
+
+  public Query6(NexmarkConfiguration configuration) {
+    super(configuration, "Query6");
+  }
+
+  private PCollection<SellerPrice> applyTyped(PCollection<Event> events) {
+    return events
+        // Find the winning bid for each closed auction.
+        .apply(new WinningBids(name + ".WinningBids", configuration))
+
+        // Key the winning bid by the seller id.
+        .apply(
+            ParDo.named(name + ".Rekey")
+                .of(new DoFn<AuctionBid, KV<Long, Bid>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Auction auction = c.element().auction;
+                    Bid bid = c.element().bid;
+                    c.output(KV.of(auction.seller, bid));
+                  }
+                }))
+
+        // Re-window to update on every wining bid.
+        .apply(
+            Window.<KV<Long, Bid>>into(new GlobalWindows())
+                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                .accumulatingFiredPanes()
+                .withAllowedLateness(Duration.ZERO))
+
+        // Find the average of last 10 winning bids for each seller.
+        .apply(Combine.<Long, Bid, Long>perKey(new MovingMeanSellingPrice(10)))
+
+        // Project into our datatype.
+        .apply(
+            ParDo.named(name + ".Select")
+                .of(new DoFn<KV<Long, Long>, SellerPrice>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(new SellerPrice(c.element().getKey(), c.element().getValue()));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java
new file mode 100644
index 0000000..639ec9f
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A direct implementation of {@link Query6}.
+ */
+public class Query6Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 6.
+   */
+  private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> {
+    /** The cumulative count of winning bids, indexed by seller id. */
+    private final Map<Long, Long> numWinningBidsPerSeller;
+
+    /** The cumulative total of winning bid prices, indexed by seller id. */
+    private final Map<Long, Long> totalWinningBidPricesPerSeller;
+
+    private Instant lastTimestamp;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(new WinningBidsSimulator(configuration).results());
+      numWinningBidsPerSeller = new TreeMap<>();
+      totalWinningBidPricesPerSeller = new TreeMap<>();
+      lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * Update the per-seller running counts/sums.
+     */
+    private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
+      NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid);
+      Long count = numWinningBidsPerSeller.get(auction.seller);
+      if (count == null) {
+        count = 1L;
+      } else {
+        count += 1;
+      }
+      numWinningBidsPerSeller.put(auction.seller, count);
+      Long total = totalWinningBidPricesPerSeller.get(auction.seller);
+      if (total == null) {
+        total = bid.price;
+      } else {
+        total += bid.price;
+      }
+      totalWinningBidPricesPerSeller.put(auction.seller, total);
+      TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of(
+          new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp);
+      addIntermediateResult(intermediateResult);
+    }
+
+
+    @Override
+    protected void run() {
+      TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
+      if (timestampedWinningBid == null) {
+        for (long seller : numWinningBidsPerSeller.keySet()) {
+          long count = numWinningBidsPerSeller.get(seller);
+          long total = totalWinningBidPricesPerSeller.get(seller);
+          addResult(TimestampedValue.of(
+              new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));
+        }
+        allDone();
+        return;
+      }
+
+      lastTimestamp = timestampedWinningBid.getTimestamp();
+      captureWinningBid(timestampedWinningBid.getValue().auction,
+          timestampedWinningBid.getValue().bid, lastTimestamp);
+    }
+  }
+
+  public Query6Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+      Iterable<TimestampedValue<KnownSize>> results) {
+    // Find the last (in processing time) reported average price for each seller.
+    Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
+    for (TimestampedValue<KnownSize> obj : results) {
+      Assert.assertTrue("have SellerPrice", obj.getValue() instanceof SellerPrice);
+      SellerPrice sellerPrice = (SellerPrice) obj.getValue();
+      finalAverages.put(
+          sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, obj.getTimestamp()));
+    }
+    return finalAverages.values();
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7.java
new file mode 100644
index 0000000..1f63b35
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import org.joda.time.Duration;
+
+/**
+ * Query 7, 'Highest Bid'. Select the bids with the highest bid
+ * price in the last minute. In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(B.auction, B.price, B.bidder)
+ * FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B
+ * WHERE B.price = (SELECT MAX(B1.price)
+ *                  FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1);
+ * </pre>
+ *
+ * <p>We will use a shorter window to help make testing easier. We'll also implement this using
+ * a side-input in order to exercise that functionality. (A combiner, as used in Query 5, is
+ * a more efficient approach.).
+ */
+class Query7 extends NexmarkQuery {
+  public Query7(NexmarkConfiguration configuration) {
+    super(configuration, "Query7");
+  }
+
+  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+    // Window the bids.
+    PCollection<Bid> slidingBids = events.apply(JUST_BIDS).apply(
+        Window.<Bid>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
+
+    // Find the largest price in all bids.
+    // NOTE: It would be more efficient to write this query much as we did for Query5, using
+    // a binary combiner to accumulate the bids with maximal price. As written this query
+    // requires an additional scan per window, with the associated cost of snapshotted state and
+    // its I/O. We'll keep this implementation since it illustrates the use of side inputs.
+    final PCollectionView<Long> maxPriceView =
+        slidingBids //
+            .apply(BID_TO_PRICE)
+            .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
+
+    return slidingBids
+        // Select all bids which have that maximum price (there may be more than one).
+        .apply(
+            ParDo.named(name + ".Select")
+                .withSideInputs(maxPriceView)
+                .of(new DoFn<Bid, Bid>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    long maxPrice = c.sideInput(maxPriceView);
+                    Bid bid = c.element();
+                    if (bid.price == maxPrice) {
+                      c.output(bid);
+                    }
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
new file mode 100644
index 0000000..e835133
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A direct implementation of {@link Query7}.
+ */
+public class Query7Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 7.
+   */
+  private class Simulator extends AbstractSimulator<Event, Bid> {
+    /** Bids with highest bid price seen in the current window. */
+    private final List<Bid> highestBids;
+
+    /** When current window started. */
+    private Instant windowStart;
+
+    private Instant lastTimestamp;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      highestBids = new ArrayList<>();
+      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+      lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * Transfer the currently winning bids into results and retire them.
+     */
+    private void retireWindow(Instant timestamp) {
+      for (Bid bid : highestBids) {
+        addResult(TimestampedValue.of(bid, timestamp));
+      }
+      highestBids.clear();
+    }
+
+    /**
+     * Keep just the highest price bid.
+     */
+    private void captureBid(Bid bid) {
+      Iterator<Bid> itr = highestBids.iterator();
+      boolean isWinning = true;
+      while (itr.hasNext()) {
+        Bid existingBid = itr.next();
+        if (existingBid.price > bid.price) {
+          isWinning = false;
+          break;
+        }
+        NexmarkUtils.info("smaller price: %s", existingBid);
+        itr.remove();
+      }
+      if (isWinning) {
+        NexmarkUtils.info("larger price: %s", bid);
+        highestBids.add(bid);
+      }
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        // Capture all remaining bids in results.
+        retireWindow(lastTimestamp);
+        allDone();
+        return;
+      }
+
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+      lastTimestamp = timestampedEvent.getTimestamp();
+      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+          Duration.standardSeconds(configuration.windowSizeSec), lastTimestamp);
+      if (!newWindowStart.equals(windowStart)) {
+        // Capture highest priced bids in current window and retire it.
+        retireWindow(lastTimestamp);
+        windowStart = newWindowStart;
+      }
+      // Keep only the highest bids.
+      captureBid(event.bid);
+    }
+  }
+
+  public Query7Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8.java
new file mode 100644
index 0000000..e58453b
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+
+/**
+ * Query 8, 'Monitor New Users'. Select people who have entered the system and created auctions
+ * in the last 12 hours, updated every 12 hours. In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(P.id, P.name, A.reserve)
+ * FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
+ * WHERE P.id = A.seller;
+ * </pre>
+ *
+ * <p>To make things a bit more dynamic and easier to test we'll use a much shorter window.
+ */
+class Query8 extends NexmarkQuery {
+  public Query8(NexmarkConfiguration configuration) {
+    super(configuration, "Query8");
+  }
+
+  private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) {
+    // Window and key new people by their id.
+    PCollection<KV<Long, Person>> personsById =
+        events.apply(JUST_NEW_PERSONS)
+            .apply(Window.<Person>into(
+                             FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+                    .named("Query8.WindowPersons"))
+            .apply(PERSON_BY_ID);
+
+    // Window and key new auctions by their id.
+    PCollection<KV<Long, Auction>> auctionsBySeller =
+        events.apply(JUST_NEW_AUCTIONS)
+            .apply(Window.<Auction>into(
+                             FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+                    .named("Query8.WindowAuctions"))
+            .apply(AUCTION_BY_SELLER);
+
+    // Join people and auctions and project the person id, name and auction reserve price.
+    return KeyedPCollectionTuple.of(PERSON_TAG, personsById)
+        .and(AUCTION_TAG, auctionsBySeller)
+        .apply(CoGroupByKey.<Long>create())
+        .apply(
+            ParDo.named(name + ".Select")
+                .of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Person person = c.element().getValue().getOnly(PERSON_TAG, null);
+                    if (person == null) {
+                      // Person was not created in last window period.
+                      return;
+                    }
+                    for (Auction auction : c.element().getValue().getAll(AUCTION_TAG)) {
+                      c.output(new IdNameReserve(person.id, person.name, auction.reserve));
+                    }
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
new file mode 100644
index 0000000..00f7355
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A direct implementation of {@link Query8}.
+ */
+public class Query8Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 8.
+   */
+  private class Simulator extends AbstractSimulator<Event, IdNameReserve> {
+    /** New persons seen in the current window, indexed by id. */
+    private final Map<Long, Person> newPersons;
+
+    /** New auctions seen in the current window, indexed by seller id. */
+    private final Multimap<Long, Auction> newAuctions;
+
+    /** When did the current window start. */
+    private Instant windowStart;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      newPersons = new HashMap<>();
+      newAuctions = ArrayListMultimap.create();
+      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+    }
+
+    /**
+     * Retire all persons added in last window.
+     */
+    private void retirePersons() {
+      for (Map.Entry<Long, Person> entry : newPersons.entrySet()) {
+        NexmarkUtils.info("retire: %s", entry.getValue());
+      }
+      newPersons.clear();
+    }
+
+    /**
+     * Retire all auctions added in last window.
+     */
+    private void retireAuctions() {
+      for (Map.Entry<Long, Auction> entry : newAuctions.entries()) {
+        NexmarkUtils.info("retire: %s", entry.getValue());
+      }
+      newAuctions.clear();
+    }
+
+    /**
+     * Capture new result.
+     */
+    private void addResult(Auction auction, Person person, Instant timestamp) {
+      addResult(TimestampedValue.of(
+          new IdNameReserve(person.id, person.name, auction.reserve), timestamp));
+    }
+
+    @Override
+    public void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+
+      Event event = timestampedEvent.getValue();
+      if (event.bid != null) {
+        // Ignore bid events.
+        // Keep looking for next events.
+        return;
+      }
+      Instant timestamp = timestampedEvent.getTimestamp();
+      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+          Duration.standardSeconds(configuration.windowSizeSec), timestamp);
+      if (!newWindowStart.equals(windowStart)) {
+        // Retire this window.
+        retirePersons();
+        retireAuctions();
+        windowStart = newWindowStart;
+      }
+
+      if (event.newAuction != null) {
+        // Join new auction with existing person, if any.
+        Person person = newPersons.get(event.newAuction.seller);
+        if (person != null) {
+          addResult(event.newAuction, person, timestamp);
+        } else {
+          // Remember auction for future new people.
+          newAuctions.put(event.newAuction.seller, event.newAuction);
+        }
+      } else {
+        // Join new person with existing auctions.
+        for (Auction auction : newAuctions.get(event.newPerson.id)) {
+          addResult(auction, event.newPerson, timestamp);
+        }
+        // We'll never need these auctions again.
+        newAuctions.removeAll(event.newPerson.id);
+        // Remember person for future auctions.
+        newPersons.put(event.newPerson.id, event.newPerson);
+      }
+    }
+  }
+
+  public Query8Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9.java
new file mode 100644
index 0000000..2c0a526
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but
+ * handy for testing. See {@link WinningBids} for the details.
+ */
+class Query9 extends NexmarkQuery {
+  public Query9(NexmarkConfiguration configuration) {
+    super(configuration, "Query9");
+  }
+
+  private PCollection<AuctionBid> applyTyped(PCollection<Event> events) {
+    return events.apply(new WinningBids(name, configuration));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java
new file mode 100644
index 0000000..1fad648
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A direct implementation of {@link Query9}.
+ */
+public class Query9Model extends NexmarkQueryModel implements Serializable {
+  public Query9Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new WinningBidsSimulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/README.md b/integration/java/src/main/java/org/apache/beam/integration/nexmark/README.md
new file mode 100644
index 0000000..5e33327
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/README.md
@@ -0,0 +1,166 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# NEXMark integration suite
+
+This is a suite of pipelines inspired by the 'continuous data stream'
+queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/]
+(http://datalab.cs.pdx.edu/niagaraST/NEXMark/).
+
+The queries are over a simple online auction system with tables of
+**Person**, **Auction** and **Bid** records.
+
+The queries are:
+
+* **Query1**: What are the bid values in Euro's?
+  Illustrates a simple map.
+* **Query2**: What are the auctions with particular auction numbers?
+  Illustrates a simple filter.
+* **Query3**: Who is selling in particular US states?
+  Illustrates an incremental join (using per-key state) and filter.
+* **Query4**: What is the average selling price for each auction
+  category?
+  Illustrates complex join (using custom window functions) and
+  aggregation.
+* **Query5**: Which auctions have seen the most bids in the last period?
+  Illustrates sliding windows and combiners.
+* **Query6**: What is the average selling price per seller for their
+  last 10 closed auctions.
+  Shares the same 'winning bids' core as for **Query4**, and
+  illustrates a specialized combiner.
+* **Query7**: What are the highest bids per period?
+  Deliberately implemented using a side input to illustrate fanout.
+* **Query8**: Who has entered the system and created an auction in
+  the last period?
+  Illustrates a simple join.
+
+We have augmented the original queries with five more:
+
+* **Query0**: Pass-through.
+  Allows us to measure the monitoring overhead.
+* **Query9**: Winning-bids.
+  A common sub-query shared by **Query4** and **Query6**.
+* **Query10**: Log all events to GCS files.
+  Illustrates windows with large side effects on firing.
+* **Query11**: How many bids did a user make in each session they
+  were active?
+  Illustrates session windows.
+* **Query12**: How many bids does a user make within a fixed
+  processing time limit?
+  Illustrates working in processing time in the Global window, as
+  compared with event time in non-Global windows for all the other
+  queries.
+
+The queries can be executed using a 'Driver' for a given backend.
+Currently the supported drivers are:
+
+* **NexmarkInProcessDriver** for running locally on a single machine.
+* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow
+  service. Requires a Google Cloud account.
+* **NexmarkFlinkDriver** for running on a Flink cluster. Requires the
+  cluster to be established and the Nexmark jar to be distributed to
+  each worker.
+
+Other drivers are straightforward.
+
+Test data is deterministically synthesized on demand. The test
+data may be synthesized in the same pipeline as the query itself,
+or may be published to Pubsub.
+
+The query results may be:
+
+* Published to Pubsub.
+* Written to text files as plain text.
+* Written to text files using an Avro encoding.
+* Send to BigQuery.
+* Discarded.
+
+Options are provided for measuring progress, measuring overall
+pipeline performance, and comparing that performance against a known
+baseline. However that machinery has only been implemented against
+the Google Cloud Dataflow driver.
+
+## Running on Google Cloud Dataflow
+
+An example invocation for **Query10** on the Google Cloud Dataflow
+service.
+
+```
+java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
+  org.apache.beam.integration.nexmark.NexmarkGoogleDriver \
+  --project=<your project> \
+  --zone=<your zone> \
+  --workerMachineType=n1-highmem-8 \
+  --stagingLocation=<a gs path for staging> \
+  --streaming=true \
+  --sourceType=PUBSUB \
+  --pubSubMode=PUBLISH_ONLY \
+  --pubsubTopic=<an existing Pubsub topic> \
+  --resourceNameMode=VERBATIM \
+  --manageResources=false \
+  --monitorJobs=false \
+  --numEventGenerators=64 \
+  --numWorkers=16 \
+  --maxNumWorkers=16 \
+  --query=10 \
+  --firstEventRate=100000 \
+  --nextEventRate=100000 \
+  --ratePeriodSec=3600 \
+  --isRateLimited=true \
+  --avgPersonByteSize=500 \
+  --avgAuctionByteSize=500 \
+  --avgBidByteSize=500 \
+  --probDelayedEvent=0.000001 \
+  --occasionalDelaySec=3600 \
+  --numEvents=0 \
+  --useWallclockEventTime=true \
+  --usePubsubPublishTime=true \
+  --experiments=enable_custom_pubsub_sink
+```
+
+```
+java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
+  org.apache.beam.integration.nexmark.NexmarkGoogleDriver \
+  --project=<your project> \
+  --zone=<your zone> \
+  --workerMachineType=n1-highmem-8 \
+  --stagingLocation=<a gs path for staging> \
+  --streaming=true \
+  --sourceType=PUBSUB \
+  --pubSubMode=SUBSCRIBE_ONLY \
+  --pubsubSubscription=<an existing Pubsub subscription to above topic> \
+  --resourceNameMode=VERBATIM \
+  --manageResources=false \
+  --monitorJobs=false \
+  --numWorkers=64 \
+  --maxNumWorkers=64 \
+  --query=10 \
+  --usePubsubPublishTime=true \
+  --outputPath=<a gs path under which log files will be written> \
+  --windowSizeSec=600 \
+  --occasionalDelaySec=3600 \
+  --maxLogEvents=10000 \
+  --experiments=enable_custom_pubsub_source
+```
+
+## Running on Flink
+
+See [BEAM_ON_FLINK_ON_GCP](./BEAM_ON_FLINK_ON_GCP.md) for instructions
+on running a NexMark pipeline using Flink hosted on a Google Compute
+Platform cluster.