You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:56 UTC

[48/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
deleted file mode 100644
index 9c0fe6d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.Monitor;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.CategoryPrice;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Mean;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 4, 'Average Price for a Category'. Select the average of the wining bid prices for all
- * closed auctions in each category. In CQL syntax:
- *
- * <pre>{@code
- * SELECT Istream(AVG(Q.final))
- * FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category)
- *                   FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- *                   WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- *                   GROUP BY A.id, A.category) Q
- * WHERE Q.category = C.id
- * GROUP BY C.id;
- * }</pre>
- *
- * <p>For extra spiciness our implementation differs slightly from the above:
- * <ul>
- * <li>We select both the average winning price and the category.
- * <li>We don't bother joining with a static category table, since it's contents are never used.
- * <li>We only consider bids which are above the auction's reserve price.
- * <li>We accept the highest-price, earliest valid bid as the winner.
- * <li>We calculate the averages oven a sliding window of size {@code windowSizeSec} and
- * period {@code windowPeriodSec}.
- * </ul>
- */
-public class Query4 extends NexmarkQuery {
-  private final Monitor<AuctionBid> winningBidsMonitor;
-
-  public Query4(NexmarkConfiguration configuration) {
-    super(configuration, "Query4");
-    winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning");
-  }
-
-  private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) {
-    PCollection<AuctionBid> winningBids =
-        events
-            // Find the winning bid for each closed auction.
-            .apply(new WinningBids(name + ".WinningBids", configuration));
-
-    // Monitor winning bids
-    winningBids = winningBids.apply(name + ".WinningBidsMonitor",
-            winningBidsMonitor.getTransform());
-
-    return winningBids
-        // Key the winning bid price by the auction category.
-        .apply(name + ".Rekey",
-            ParDo.of(new DoFn<AuctionBid, KV<Long, Long>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Auction auction = c.element().auction;
-                    Bid bid = c.element().bid;
-                    c.output(KV.of(auction.category, bid.price));
-                  }
-                }))
-
-        // Re-window so we can calculate a sliding average
-        .apply(Window.<KV<Long, Long>>into(
-            SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
-                .every(Duration.standardSeconds(configuration.windowPeriodSec))))
-
-        // Find the average of the winning bids for each category.
-        // Make sure we share the work for each category between workers.
-        .apply(Mean.<Long, Long>perKey().withHotKeyFanout(configuration.fanout))
-
-        // For testing against Query4Model, capture which results are 'final'.
-        .apply(name + ".Project",
-            ParDo.of(new DoFn<KV<Long, Double>, CategoryPrice>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(new CategoryPrice(c.element().getKey(),
-                        Math.round(c.element().getValue()), c.pane().isLast()));
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
deleted file mode 100644
index 269e47a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.CategoryPrice;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * A direct implementation of {@link Query4}.
- */
-public class Query4Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 4.
-   */
-  private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> {
-    /** The prices and categories for all winning bids in the last window size. */
-    private final List<TimestampedValue<CategoryPrice>> winningPricesByCategory;
-
-    /** Timestamp of last result (ms since epoch). */
-    private Instant lastTimestamp;
-
-    /** When oldest active window starts. */
-    private Instant windowStart;
-
-    /** The last seen result for each category. */
-    private final Map<Long, TimestampedValue<CategoryPrice>> lastSeenResults;
-
-    public Simulator(NexmarkConfiguration configuration) {
-      super(new WinningBidsSimulator(configuration).results());
-      winningPricesByCategory = new ArrayList<>();
-      lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
-      lastSeenResults = new TreeMap<>();
-    }
-
-    /**
-     * Calculate the average bid price for each category for all winning bids
-     * which are strictly before {@code end}.
-     */
-    private void averages(Instant end) {
-      Map<Long, Long> counts = new TreeMap<>();
-      Map<Long, Long> totals = new TreeMap<>();
-      for (TimestampedValue<CategoryPrice> value : winningPricesByCategory) {
-        if (!value.getTimestamp().isBefore(end)) {
-          continue;
-        }
-        long category = value.getValue().category;
-        long price = value.getValue().price;
-        Long count = counts.get(category);
-        if (count == null) {
-          count = 1L;
-        } else {
-          count += 1;
-        }
-        counts.put(category, count);
-        Long total = totals.get(category);
-        if (total == null) {
-          total = price;
-        } else {
-          total += price;
-        }
-        totals.put(category, total);
-      }
-      for (Map.Entry<Long, Long> entry : counts.entrySet()) {
-        long category = entry.getKey();
-        long count = entry.getValue();
-        long total = totals.get(category);
-        TimestampedValue<CategoryPrice> result = TimestampedValue.of(
-            new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp);
-        addIntermediateResult(result);
-        lastSeenResults.put(category, result);
-      }
-    }
-
-    /**
-     * Calculate averages for any windows which can now be retired. Also prune entries
-     * which can no longer contribute to any future window.
-     */
-    private void prune(Instant newWindowStart) {
-      while (!newWindowStart.equals(windowStart)) {
-        averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
-        windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
-        Iterator<TimestampedValue<CategoryPrice>> itr = winningPricesByCategory.iterator();
-        while (itr.hasNext()) {
-          if (itr.next().getTimestamp().isBefore(windowStart)) {
-            itr.remove();
-          }
-        }
-        if (winningPricesByCategory.isEmpty()) {
-          windowStart = newWindowStart;
-        }
-      }
-    }
-
-    /**
-     * Capture the winning bid.
-     */
-    private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
-      winningPricesByCategory.add(
-          TimestampedValue.of(new CategoryPrice(auction.category, bid.price, false), timestamp));
-    }
-
-    @Override
-    protected void run() {
-      TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
-      if (timestampedWinningBid == null) {
-        prune(NexmarkUtils.END_OF_TIME);
-        for (TimestampedValue<CategoryPrice> result : lastSeenResults.values()) {
-          addResult(result);
-        }
-        allDone();
-        return;
-      }
-      lastTimestamp = timestampedWinningBid.getTimestamp();
-      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
-          Duration.standardSeconds(configuration.windowPeriodSec), lastTimestamp);
-      prune(newWindowStart);
-      captureWinningBid(timestampedWinningBid.getValue().auction,
-          timestampedWinningBid.getValue().bid, lastTimestamp);
-    }
-  }
-
-  public Query4Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
-      Iterable<TimestampedValue<KnownSize>> results) {
-    // Find the last (in processing time) reported average price for each category.
-    Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
-    for (TimestampedValue<KnownSize> obj : results) {
-      Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof CategoryPrice);
-      CategoryPrice categoryPrice = (CategoryPrice) obj.getValue();
-      if (categoryPrice.isLast) {
-        finalAverages.put(
-            categoryPrice.category,
-            TimestampedValue.of((KnownSize) categoryPrice, obj.getTimestamp()));
-      }
-    }
-
-    return finalAverages.values();
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValue(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
deleted file mode 100644
index bdf3e5f..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.AuctionCount;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every
- * minute). In CQL syntax:
- *
- * <pre>{@code
- * SELECT Rstream(auction)
- * FROM (SELECT B1.auction, count(*) AS num
- *       FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
- *       GROUP BY B1.auction)
- * WHERE num >= ALL (SELECT count(*)
- *                   FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
- *                   GROUP BY B2.auction);
- * }</pre>
- *
- * <p>To make things a bit more dynamic and easier to test we use much shorter windows, and
- * we'll also preserve the bid counts.
- */
-public class Query5 extends NexmarkQuery {
-  public Query5(NexmarkConfiguration configuration) {
-    super(configuration, "Query5");
-  }
-
-  private PCollection<AuctionCount> applyTyped(PCollection<Event> events) {
-    return events
-        // Only want the bid events.
-        .apply(JUST_BIDS)
-        // Window the bids into sliding windows.
-        .apply(
-            Window.<Bid>into(
-                SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
-                    .every(Duration.standardSeconds(configuration.windowPeriodSec))))
-        // Project just the auction id.
-        .apply("BidToAuction", BID_TO_AUCTION)
-
-        // Count the number of bids per auction id.
-        .apply(Count.<Long>perElement())
-
-        // We'll want to keep all auctions with the maximal number of bids.
-        // Start by lifting each into a singleton list.
-        // need to do so because bellow combine returns a list of auctions in the key in case of
-        // equal number of bids. Combine needs to have same input type and return type.
-        .apply(
-            name + ".ToSingletons",
-            ParDo.of(
-                new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(
-                        KV.of(
-                            Collections.singletonList(c.element().getKey()),
-                            c.element().getValue()));
-                  }
-                }))
-
-        // Keep only the auction ids with the most bids.
-        .apply(
-            Combine.globally(
-                    new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
-                      @Override
-                      public KV<List<Long>, Long> apply(
-                          KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
-                        List<Long> leftBestAuctions = left.getKey();
-                        long leftCount = left.getValue();
-                        List<Long> rightBestAuctions = right.getKey();
-                        long rightCount = right.getValue();
-                        if (leftCount > rightCount) {
-                          return left;
-                        } else if (leftCount < rightCount) {
-                          return right;
-                        } else {
-                          List<Long> newBestAuctions = new ArrayList<>();
-                          newBestAuctions.addAll(leftBestAuctions);
-                          newBestAuctions.addAll(rightBestAuctions);
-                          return KV.of(newBestAuctions, leftCount);
-                        }
-                      }
-                    })
-                .withoutDefaults()
-                .withFanout(configuration.fanout))
-
-        // Project into result.
-        .apply(
-            name + ".Select",
-            ParDo.of(
-                new DoFn<KV<List<Long>, Long>, AuctionCount>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    long count = c.element().getValue();
-                    for (long auction : c.element().getKey()) {
-                      c.output(new AuctionCount(auction, count));
-                    }
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
deleted file mode 100644
index 24d9a00..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.AuctionCount;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query5}.
- */
-public class Query5Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 5.
-   */
-  private class Simulator extends AbstractSimulator<Event, AuctionCount> {
-    /** Time of bids still contributing to open windows, indexed by their auction id. */
-    private final Map<Long, List<Instant>> bids;
-
-    /** When oldest active window starts. */
-    private Instant windowStart;
-
-    public Simulator(NexmarkConfiguration configuration) {
-      super(NexmarkUtils.standardEventIterator(configuration));
-      bids = new TreeMap<>();
-      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
-    }
-
-    /**
-     * Count bids per auction id for bids strictly before {@code end}. Add the auction ids with
-     * the maximum number of bids to results.
-     */
-    private void countBids(Instant end) {
-      Map<Long, Long> counts = new TreeMap<>();
-      long maxCount = 0L;
-      for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
-        long count = 0L;
-        long auction = entry.getKey();
-        for (Instant bid : entry.getValue()) {
-          if (bid.isBefore(end)) {
-            count++;
-          }
-        }
-        if (count > 0) {
-          counts.put(auction, count);
-          maxCount = Math.max(maxCount, count);
-        }
-      }
-      for (Map.Entry<Long, Long> entry : counts.entrySet()) {
-        long auction = entry.getKey();
-        long count = entry.getValue();
-        if (count == maxCount) {
-          AuctionCount result = new AuctionCount(auction, count);
-          addResult(TimestampedValue.of(result, end));
-        }
-      }
-    }
-
-    /**
-     * Retire bids which are strictly before {@code cutoff}. Return true if there are any bids
-     * remaining.
-     */
-    private boolean retireBids(Instant cutoff) {
-      boolean anyRemain = false;
-      for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
-        long auction = entry.getKey();
-        Iterator<Instant> itr = entry.getValue().iterator();
-        while (itr.hasNext()) {
-          Instant bid = itr.next();
-          if (bid.isBefore(cutoff)) {
-            NexmarkUtils.info("retire: %s for %s", bid, auction);
-            itr.remove();
-          } else {
-            anyRemain = true;
-          }
-        }
-      }
-      return anyRemain;
-    }
-
-    /**
-     * Retire active windows until we've reached {@code newWindowStart}.
-     */
-    private void retireWindows(Instant newWindowStart) {
-      while (!newWindowStart.equals(windowStart)) {
-        NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart);
-        // Count bids in the window (windowStart, windowStart + size].
-        countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
-        // Advance the window.
-        windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
-        // Retire bids which will never contribute to a future window.
-        if (!retireBids(windowStart)) {
-          // Can fast forward to latest window since no more outstanding bids.
-          windowStart = newWindowStart;
-        }
-      }
-    }
-
-    /**
-     * Add bid to state.
-     */
-    private void captureBid(Bid bid, Instant timestamp) {
-      List<Instant> existing = bids.get(bid.auction);
-      if (existing == null) {
-        existing = new ArrayList<>();
-        bids.put(bid.auction, existing);
-      }
-      existing.add(timestamp);
-    }
-
-    @Override
-    public void run() {
-      TimestampedValue<Event> timestampedEvent = nextInput();
-      if (timestampedEvent == null) {
-        // Drain the remaining windows.
-        retireWindows(NexmarkUtils.END_OF_TIME);
-        allDone();
-        return;
-      }
-
-      Event event = timestampedEvent.getValue();
-      if (event.bid == null) {
-        // Ignore non-bid events.
-        return;
-      }
-      Instant timestamp = timestampedEvent.getTimestamp();
-      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
-          Duration.standardSeconds(configuration.windowPeriodSec), timestamp);
-      // Capture results from any windows we can now retire.
-      retireWindows(newWindowStart);
-      // Capture current bid.
-      captureBid(event.bid, timestamp);
-    }
-  }
-
-  public Query5Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValue(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
deleted file mode 100644
index ea39ede..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.SellerPrice;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the
- * last 10 closed auctions by the same seller. In CQL syntax:
- *
- * <pre>{@code
- * SELECT Istream(AVG(Q.final), Q.seller)
- * FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
- *       FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- *       WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- *       GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
- * GROUP BY Q.seller;
- * }</pre>
- *
- * <p>We are a little more exact with selecting winning bids: see {@link WinningBids}.
- */
-public class Query6 extends NexmarkQuery {
-  /**
-   * Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate
-   * their average selling price.
-   */
-  private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> {
-    private final int maxNumBids;
-
-    public MovingMeanSellingPrice(int maxNumBids) {
-      this.maxNumBids = maxNumBids;
-    }
-
-    @Override
-    public List<Bid> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<Bid> addInput(List<Bid> accumulator, Bid input) {
-      accumulator.add(input);
-      Collections.sort(accumulator, Bid.ASCENDING_TIME_THEN_PRICE);
-      if (accumulator.size() > maxNumBids) {
-        accumulator.remove(0);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
-      List<Bid> result = new ArrayList<>();
-      for (List<Bid> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE);
-      if (result.size() > maxNumBids) {
-        result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids));
-      }
-      return result;
-    }
-
-    @Override
-    public Long extractOutput(List<Bid> accumulator) {
-      if (accumulator.isEmpty()) {
-        return 0L;
-      }
-      long sumOfPrice = 0;
-      for (Bid bid : accumulator) {
-        sumOfPrice += bid.price;
-      }
-      return Math.round((double) sumOfPrice / accumulator.size());
-    }
-  }
-
-  public Query6(NexmarkConfiguration configuration) {
-    super(configuration, "Query6");
-  }
-
-  private PCollection<SellerPrice> applyTyped(PCollection<Event> events) {
-    return events
-        // Find the winning bid for each closed auction.
-        .apply(new WinningBids(name + ".WinningBids", configuration))
-
-        // Key the winning bid by the seller id.
-        .apply(name + ".Rekey",
-            ParDo.of(new DoFn<AuctionBid, KV<Long, Bid>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Auction auction = c.element().auction;
-                    Bid bid = c.element().bid;
-                    c.output(KV.of(auction.seller, bid));
-                  }
-                }))
-
-        // Re-window to update on every wining bid.
-        .apply(
-            Window.<KV<Long, Bid>>into(new GlobalWindows())
-                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
-                .accumulatingFiredPanes()
-                .withAllowedLateness(Duration.ZERO))
-
-        // Find the average of last 10 winning bids for each seller.
-        .apply(Combine.<Long, Bid, Long>perKey(new MovingMeanSellingPrice(10)))
-
-        // Project into our datatype.
-        .apply(name + ".Select",
-            ParDo.of(new DoFn<KV<Long, Long>, SellerPrice>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(new SellerPrice(c.element().getKey(), c.element().getValue()));
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
deleted file mode 100644
index 9cb8b3d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.SellerPrice;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * A direct implementation of {@link Query6}.
- */
-public class Query6Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 6.
-   */
-  private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> {
-    /** The cumulative count of winning bids, indexed by seller id. */
-    private final Map<Long, Long> numWinningBidsPerSeller;
-
-    /** The cumulative total of winning bid prices, indexed by seller id. */
-    private final Map<Long, Long> totalWinningBidPricesPerSeller;
-
-    private Instant lastTimestamp;
-
-    public Simulator(NexmarkConfiguration configuration) {
-      super(new WinningBidsSimulator(configuration).results());
-      numWinningBidsPerSeller = new TreeMap<>();
-      totalWinningBidPricesPerSeller = new TreeMap<>();
-      lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    /**
-     * Update the per-seller running counts/sums.
-     */
-    private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
-      NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid);
-      Long count = numWinningBidsPerSeller.get(auction.seller);
-      if (count == null) {
-        count = 1L;
-      } else {
-        count += 1;
-      }
-      numWinningBidsPerSeller.put(auction.seller, count);
-      Long total = totalWinningBidPricesPerSeller.get(auction.seller);
-      if (total == null) {
-        total = bid.price;
-      } else {
-        total += bid.price;
-      }
-      totalWinningBidPricesPerSeller.put(auction.seller, total);
-      TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of(
-          new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp);
-      addIntermediateResult(intermediateResult);
-    }
-
-
-    @Override
-    protected void run() {
-      TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
-      if (timestampedWinningBid == null) {
-        for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) {
-          long seller = entry.getKey();
-          long count = entry.getValue();
-          long total = totalWinningBidPricesPerSeller.get(seller);
-          addResult(TimestampedValue.of(
-              new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));
-        }
-        allDone();
-        return;
-      }
-
-      lastTimestamp = timestampedWinningBid.getTimestamp();
-      captureWinningBid(timestampedWinningBid.getValue().auction,
-          timestampedWinningBid.getValue().bid, lastTimestamp);
-    }
-  }
-
-  public Query6Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
-      Iterable<TimestampedValue<KnownSize>> results) {
-    // Find the last (in processing time) reported average price for each seller.
-    Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
-    for (TimestampedValue<KnownSize> obj : results) {
-      Assert.assertTrue("have SellerPrice", obj.getValue() instanceof SellerPrice);
-      SellerPrice sellerPrice = (SellerPrice) obj.getValue();
-      finalAverages.put(
-          sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, obj.getTimestamp()));
-    }
-    return finalAverages.values();
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValue(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
deleted file mode 100644
index 217d0d4..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import org.joda.time.Duration;
-
-/**
- * Query 7, 'Highest Bid'. Select the bids with the highest bid
- * price in the last minute. In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(B.auction, B.price, B.bidder)
- * FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B
- * WHERE B.price = (SELECT MAX(B1.price)
- *                  FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1);
- * </pre>
- *
- * <p>We will use a shorter window to help make testing easier. We'll also implement this using
- * a side-input in order to exercise that functionality. (A combiner, as used in Query 5, is
- * a more efficient approach.).
- */
-public class Query7 extends NexmarkQuery {
-  public Query7(NexmarkConfiguration configuration) {
-    super(configuration, "Query7");
-  }
-
-  private PCollection<Bid> applyTyped(PCollection<Event> events) {
-    // Window the bids.
-    PCollection<Bid> slidingBids = events.apply(JUST_BIDS).apply(
-        Window.<Bid>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
-
-    // Find the largest price in all bids.
-    // NOTE: It would be more efficient to write this query much as we did for Query5, using
-    // a binary combiner to accumulate the bids with maximal price. As written this query
-    // requires an additional scan per window, with the associated cost of snapshotted state and
-    // its I/O. We'll keep this implementation since it illustrates the use of side inputs.
-    final PCollectionView<Long> maxPriceView =
-        slidingBids
-            .apply("BidToPrice", BID_TO_PRICE)
-            .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
-
-    return slidingBids
-        // Select all bids which have that maximum price (there may be more than one).
-        .apply(name + ".Select", ParDo
-          .of(new DoFn<Bid, Bid>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  long maxPrice = c.sideInput(maxPriceView);
-                  Bid bid = c.element();
-                  if (bid.price == maxPrice) {
-                    c.output(bid);
-                  }
-                }
-              })
-          .withSideInputs(maxPriceView));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
deleted file mode 100644
index 0ada5e8..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query7}.
- */
-public class Query7Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 7.
-   */
-  private class Simulator extends AbstractSimulator<Event, Bid> {
-    /** Bids with highest bid price seen in the current window. */
-    private final List<Bid> highestBids;
-
-    /** When current window started. */
-    private Instant windowStart;
-
-    private Instant lastTimestamp;
-
-    public Simulator(NexmarkConfiguration configuration) {
-      super(NexmarkUtils.standardEventIterator(configuration));
-      highestBids = new ArrayList<>();
-      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
-      lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    /**
-     * Transfer the currently winning bids into results and retire them.
-     */
-    private void retireWindow(Instant timestamp) {
-      for (Bid bid : highestBids) {
-        addResult(TimestampedValue.of(bid, timestamp));
-      }
-      highestBids.clear();
-    }
-
-    /**
-     * Keep just the highest price bid.
-     */
-    private void captureBid(Bid bid) {
-      Iterator<Bid> itr = highestBids.iterator();
-      boolean isWinning = true;
-      while (itr.hasNext()) {
-        Bid existingBid = itr.next();
-        if (existingBid.price > bid.price) {
-          isWinning = false;
-          break;
-        }
-        NexmarkUtils.info("smaller price: %s", existingBid);
-        itr.remove();
-      }
-      if (isWinning) {
-        NexmarkUtils.info("larger price: %s", bid);
-        highestBids.add(bid);
-      }
-    }
-
-    @Override
-    protected void run() {
-      TimestampedValue<Event> timestampedEvent = nextInput();
-      if (timestampedEvent == null) {
-        // Capture all remaining bids in results.
-        retireWindow(lastTimestamp);
-        allDone();
-        return;
-      }
-
-      Event event = timestampedEvent.getValue();
-      if (event.bid == null) {
-        // Ignore non-bid events.
-        return;
-      }
-      lastTimestamp = timestampedEvent.getTimestamp();
-      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
-          Duration.standardSeconds(configuration.windowSizeSec), lastTimestamp);
-      if (!newWindowStart.equals(windowStart)) {
-        // Capture highest priced bids in current window and retire it.
-        retireWindow(lastTimestamp);
-        windowStart = newWindowStart;
-      }
-      // Keep only the highest bids.
-      captureBid(event.bid);
-    }
-  }
-
-  public Query7Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValueOrder(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
deleted file mode 100644
index 603841b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.IdNameReserve;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 8, 'Monitor New Users'. Select people who have entered the system and created auctions
- * in the last 12 hours, updated every 12 hours. In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(P.id, P.name, A.reserve)
- * FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
- * WHERE P.id = A.seller;
- * </pre>
- *
- * <p>To make things a bit more dynamic and easier to test we'll use a much shorter window.
- */
-public class Query8 extends NexmarkQuery {
-  public Query8(NexmarkConfiguration configuration) {
-    super(configuration, "Query8");
-  }
-
-  private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) {
-    // Window and key new people by their id.
-    PCollection<KV<Long, Person>> personsById =
-        events
-          .apply(JUST_NEW_PERSONS)
-          .apply("Query8.WindowPersons",
-            Window.<Person>into(
-              FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
-            .apply("PersonById", PERSON_BY_ID);
-
-    // Window and key new auctions by their id.
-    PCollection<KV<Long, Auction>> auctionsBySeller =
-        events.apply(JUST_NEW_AUCTIONS)
-          .apply("Query8.WindowAuctions",
-            Window.<Auction>into(
-              FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
-            .apply("AuctionBySeller", AUCTION_BY_SELLER);
-
-    // Join people and auctions and project the person id, name and auction reserve price.
-    return KeyedPCollectionTuple.of(PERSON_TAG, personsById)
-        .and(AUCTION_TAG, auctionsBySeller)
-        .apply(CoGroupByKey.<Long>create())
-        .apply(name + ".Select",
-            ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Person person = c.element().getValue().getOnly(PERSON_TAG, null);
-                    if (person == null) {
-                      // Person was not created in last window period.
-                      return;
-                    }
-                    for (Auction auction : c.element().getValue().getAll(AUCTION_TAG)) {
-                      c.output(new IdNameReserve(person.id, person.name, auction.reserve));
-                    }
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
deleted file mode 100644
index 8c76bc6..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.IdNameReserve;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query8}.
- */
-public class Query8Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 8.
-   */
-  private class Simulator extends AbstractSimulator<Event, IdNameReserve> {
-    /** New persons seen in the current window, indexed by id. */
-    private final Map<Long, Person> newPersons;
-
-    /** New auctions seen in the current window, indexed by seller id. */
-    private final Multimap<Long, Auction> newAuctions;
-
-    /** When did the current window start. */
-    private Instant windowStart;
-
-    public Simulator(NexmarkConfiguration configuration) {
-      super(NexmarkUtils.standardEventIterator(configuration));
-      newPersons = new HashMap<>();
-      newAuctions = ArrayListMultimap.create();
-      windowStart = NexmarkUtils.BEGINNING_OF_TIME;
-    }
-
-    /**
-     * Retire all persons added in last window.
-     */
-    private void retirePersons() {
-      for (Map.Entry<Long, Person> entry : newPersons.entrySet()) {
-        NexmarkUtils.info("retire: %s", entry.getValue());
-      }
-      newPersons.clear();
-    }
-
-    /**
-     * Retire all auctions added in last window.
-     */
-    private void retireAuctions() {
-      for (Map.Entry<Long, Auction> entry : newAuctions.entries()) {
-        NexmarkUtils.info("retire: %s", entry.getValue());
-      }
-      newAuctions.clear();
-    }
-
-    /**
-     * Capture new result.
-     */
-    private void addResult(Auction auction, Person person, Instant timestamp) {
-      addResult(TimestampedValue.of(
-          new IdNameReserve(person.id, person.name, auction.reserve), timestamp));
-    }
-
-    @Override
-    public void run() {
-      TimestampedValue<Event> timestampedEvent = nextInput();
-      if (timestampedEvent == null) {
-        allDone();
-        return;
-      }
-
-      Event event = timestampedEvent.getValue();
-      if (event.bid != null) {
-        // Ignore bid events.
-        // Keep looking for next events.
-        return;
-      }
-      Instant timestamp = timestampedEvent.getTimestamp();
-      Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
-          Duration.standardSeconds(configuration.windowSizeSec), timestamp);
-      if (!newWindowStart.equals(windowStart)) {
-        // Retire this window.
-        retirePersons();
-        retireAuctions();
-        windowStart = newWindowStart;
-      }
-
-      if (event.newAuction != null) {
-        // Join new auction with existing person, if any.
-        Person person = newPersons.get(event.newAuction.seller);
-        if (person != null) {
-          addResult(event.newAuction, person, timestamp);
-        } else {
-          // Remember auction for future new people.
-          newAuctions.put(event.newAuction.seller, event.newAuction);
-        }
-      } else { // event is not an auction, nor a bid, so it is a person
-        // Join new person with existing auctions.
-        for (Auction auction : newAuctions.get(event.newPerson.id)) {
-          addResult(auction, event.newPerson, timestamp);
-        }
-        // We'll never need these auctions again.
-        newAuctions.removeAll(event.newPerson.id);
-        // Remember person for future auctions.
-        newPersons.put(event.newPerson.id, event.newPerson);
-      }
-    }
-  }
-
-  public Query8Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValue(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
deleted file mode 100644
index 6dd189d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but
- * handy for testing. See {@link WinningBids} for the details.
- */
-public class Query9 extends NexmarkQuery {
-  public Query9(NexmarkConfiguration configuration) {
-    super(configuration, "Query9");
-  }
-
-  private PCollection<AuctionBid> applyTyped(PCollection<Event> events) {
-    return events.apply(new WinningBids(name, configuration));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
deleted file mode 100644
index d117e2d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query9}.
- */
-public class Query9Model extends NexmarkQueryModel implements Serializable {
-  public Query9Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new WinningBidsSimulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValue(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
deleted file mode 100644
index d4ca177..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * A transform to find the winning bid for each closed auction. In pseudo CQL syntax:
- *
- * <pre>{@code
- * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
- * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- * GROUP BY A.id
- * }</pre>
- *
- * <p>We will also check that the winning bid is above the auction reserve. Note that
- * we ignore the auction opening bid value since it has no impact on which bid eventually wins,
- * if any.
- *
- * <p>Our implementation will use a custom windowing function in order to bring bids and
- * auctions together without requiring global state.
- */
-public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
-  /** Windows for open auctions and bids. */
-  private static class AuctionOrBidWindow extends IntervalWindow {
-    /** Id of auction this window is for. */
-    public final long auction;
-
-    /**
-     * True if this window represents an actual auction, and thus has a start/end
-     * time matching that of the auction. False if this window represents a bid, and
-     * thus has an unbounded start/end time.
-     */
-    public final boolean isAuctionWindow;
-
-    /** For avro only. */
-    private AuctionOrBidWindow() {
-      super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
-      auction = 0;
-      isAuctionWindow = false;
-    }
-
-    private AuctionOrBidWindow(
-        Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
-      super(start, end);
-      this.auction = auctionId;
-      this.isAuctionWindow = isAuctionWindow;
-    }
-
-    /** Return an auction window for {@code auction}. */
-    public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
-      return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
-    }
-
-    /**
-     * Return a bid window for {@code bid}. It should later be merged into
-     * the corresponding auction window. However, it is possible this bid is for an already
-     * expired auction, or for an auction which the system has not yet seen. So we
-     * give the bid a bit of wiggle room in its interval.
-     */
-    public static AuctionOrBidWindow forBid(
-        long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
-      // At this point we don't know which auctions are still valid, and the bid may
-      // be for an auction which won't start until some unknown time in the future
-      // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
-      // A real system would atomically reconcile bids and auctions by a separate mechanism.
-      // If we give bids an unbounded window it is possible a bid for an auction which
-      // has already expired would cause the system watermark to stall, since that window
-      // would never be retired.
-      // Instead, we will just give the bid a finite window which expires at
-      // the upper bound of auctions assuming the auction starts at the same time as the bid,
-      // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
-      return new AuctionOrBidWindow(
-          timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
-    }
-
-    /** Is this an auction window? */
-    public boolean isAuctionWindow() {
-      return isAuctionWindow;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
-          start(), end(), auction, isAuctionWindow);
-    }
-
-    @Override public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      if (!super.equals(o)) {
-        return false;
-      }
-      AuctionOrBidWindow that = (AuctionOrBidWindow) o;
-      return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction);
-    }
-
-    @Override public int hashCode() {
-      return Objects.hash(isAuctionWindow, auction);
-    }
-  }
-
-  /**
-   * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
-   */
-  private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
-    private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
-    private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
-    private static final Coder<Long> ID_CODER = VarLongCoder.of();
-    private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
-    @JsonCreator
-    public static AuctionOrBidWindowCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(AuctionOrBidWindow window, OutputStream outStream)
-        throws IOException, CoderException {
-      SUPER_CODER.encode(window, outStream);
-      ID_CODER.encode(window.auction, outStream);
-      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream);
-    }
-
-    @Override
-    public AuctionOrBidWindow decode(InputStream inStream)
-        throws IOException, CoderException {
-      IntervalWindow superWindow = SUPER_CODER.decode(inStream);
-      long auction = ID_CODER.decode(inStream);
-      boolean isAuctionWindow = INT_CODER.decode(inStream) != 0;
-      return new AuctionOrBidWindow(
-          superWindow.start(), superWindow.end(), auction, isAuctionWindow);
-    }
-
-    @Override public void verifyDeterministic() throws NonDeterministicException {}
-  }
-
-  /** Assign events to auction windows and merges them intelligently. */
-  private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
-    /** Expected duration of auctions in ms. */
-    private final long expectedAuctionDurationMs;
-
-    public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
-      this.expectedAuctionDurationMs = expectedAuctionDurationMs;
-    }
-
-    @Override
-    public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
-      Event event = c.element();
-      if (event.newAuction != null) {
-        // Assign auctions to an auction window which expires at the auction's close.
-        return Collections
-            .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
-      } else if (event.bid != null) {
-        // Assign bids to a temporary bid window which will later be merged into the appropriate
-        // auction window.
-        return Collections.singletonList(
-            AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
-      } else {
-        // Don't assign people to any window. They will thus be dropped.
-        return Collections.emptyList();
-      }
-    }
-
-    @Override
-    public void mergeWindows(MergeContext c) throws Exception {
-      // Split and index the auction and bid windows by auction id.
-      Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
-      Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>();
-      for (AuctionOrBidWindow window : c.windows()) {
-        if (window.isAuctionWindow()) {
-          idToTrueAuctionWindow.put(window.auction, window);
-        } else {
-          List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction);
-          if (bidWindows == null) {
-            bidWindows = new ArrayList<>();
-            idToBidAuctionWindows.put(window.auction, bidWindows);
-          }
-          bidWindows.add(window);
-        }
-      }
-
-      // Merge all 'bid' windows into their corresponding 'auction' window, provided the
-      // auction has not expired.
-      for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) {
-        long auction = entry.getKey();
-        AuctionOrBidWindow auctionWindow = entry.getValue();
-        List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
-        if (bidWindows != null) {
-          List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
-          for (AuctionOrBidWindow bidWindow : bidWindows) {
-            if (bidWindow.start().isBefore(auctionWindow.end())) {
-              toBeMerged.add(bidWindow);
-            }
-            // else: This bid window will remain until its expire time, at which point it
-            // will expire without ever contributing to an output.
-          }
-          if (!toBeMerged.isEmpty()) {
-            toBeMerged.add(auctionWindow);
-            c.merge(toBeMerged, auctionWindow);
-          }
-        }
-      }
-    }
-
-    @Override
-    public boolean isCompatible(WindowFn<?, ?> other) {
-      return other instanceof AuctionOrBidWindowFn;
-    }
-
-    @Override
-    public Coder<AuctionOrBidWindow> windowCoder() {
-      return AuctionOrBidWindowCoder.of();
-    }
-
-    @Override
-    public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
-      throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
-    }
-
-    /**
-     * Below we will GBK auctions and bids on their auction ids. Then we will reduce those
-     * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at
-     * least one valid bid. We would like those output pairs to have a timestamp of the auction's
-     * expiry (since that's the earliest we know for sure we have the correct winner). We would
-     * also like to make that winning results are available to following stages at the auction's
-     * expiry.
-     *
-     * <p>Each result of the GBK will have a timestamp of the min of the result of this object's
-     * assignOutputTime over all records which end up in one of its iterables. Thus we get the
-     * desired behavior if we ignore each record's timestamp and always return the auction window's
-     * 'maxTimestamp', which will correspond to the auction's expiry.
-     *
-     * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp'
-     * (the usual implementation), then each GBK record will take as its timestamp the minimum of
-     * the timestamps of all bids and auctions within it, which will always be the auction's
-     * timestamp. An auction which expires well into the future would thus hold up the watermark
-     * of the GBK results until that auction expired. That in turn would hold up all winning pairs.
-     */
-    @Override
-    public Instant getOutputTime(
-        Instant inputTimestamp, AuctionOrBidWindow window) {
-      return window.maxTimestamp();
-    }
-  }
-
-  private final AuctionOrBidWindowFn auctionOrBidWindowFn;
-
-  public WinningBids(String name, NexmarkConfiguration configuration) {
-    super(name);
-    // What's the expected auction time (when the system is running at the lowest event rate).
-    long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
-        configuration.firstEventRate, configuration.nextEventRate,
-        configuration.rateUnit, configuration.numEventGenerators);
-    long longestDelayUs = 0;
-    for (long interEventDelayU : interEventDelayUs) {
-      longestDelayUs = Math.max(longestDelayUs, interEventDelayU);
-    }
-    // Adjust for proportion of auction events amongst all events.
-    longestDelayUs =
-        (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
-        / GeneratorConfig.AUCTION_PROPORTION;
-    // Adjust for number of in-flight auctions.
-    longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
-    long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
-    NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
-    auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
-  }
-
-  @Override
-  public PCollection<AuctionBid> expand(PCollection<Event> events) {
-    // Window auctions and bids into custom auction windows. New people events will be discarded.
-    // This will allow us to bring bids and auctions together irrespective of how long
-    // each auction is open for.
-    events = events.apply("Window", Window.into(auctionOrBidWindowFn));
-
-    // Key auctions by their id.
-    PCollection<KV<Long, Auction>> auctionsById =
-        events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
-              .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
-
-    // Key bids by their auction id.
-    PCollection<KV<Long, Bid>> bidsByAuctionId =
-        events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
-
-    // Find the highest price valid bid for each closed auction.
-    return
-      // Join auctions and bids.
-      KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
-        .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
-        .apply(CoGroupByKey.<Long>create())
-        // Filter and select.
-        .apply(name + ".Join",
-          ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
-            private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
-            private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
-            private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
-
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              Auction auction =
-                  c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
-              if (auction == null) {
-                // We have bids without a matching auction. Give up.
-                noAuctionCounter.inc();
-                return;
-              }
-              // Find the current winning bid for auction.
-              // The earliest bid with the maximum price above the reserve wins.
-              Bid bestBid = null;
-              for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
-                // Bids too late for their auction will have been
-                // filtered out by the window merge function.
-                checkState(bid.dateTime < auction.expires);
-                if (bid.price < auction.reserve) {
-                  // Bid price is below auction reserve.
-                  underReserveCounter.inc();
-                  continue;
-                }
-
-                if (bestBid == null
-                    || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
-                  bestBid = bid;
-                }
-              }
-              if (bestBid == null) {
-                // We don't have any valid bids for auction.
-                noValidBidsCounter.inc();
-                return;
-              }
-              c.output(new AuctionBid(auction, bestBid));
-            }
-          }
-        ));
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(auctionOrBidWindowFn);
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    WinningBids that = (WinningBids) o;
-    return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn);
-  }
-}