You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:37 UTC

[29/55] [abbrv] beam git commit: Move WinningBids into the queries package

Move WinningBids into the queries package


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a39cb800
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a39cb800
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a39cb800

Branch: refs/heads/master
Commit: a39cb80009f569e1c8ba82ee9c67a7c5dbe3d16f
Parents: a6dbdfa
Author: Ismaël Mejía <ie...@apache.org>
Authored: Sun Apr 30 17:44:07 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 .../integration/nexmark/AbstractSimulator.java  | 210 ----------
 .../beam/integration/nexmark/NexmarkQuery.java  | 267 -------------
 .../integration/nexmark/NexmarkQueryModel.java  | 122 ------
 .../beam/integration/nexmark/NexmarkRunner.java |   2 +
 .../beam/integration/nexmark/WinningBids.java   | 377 ------------------
 .../nexmark/WinningBidsSimulator.java           | 205 ----------
 .../integration/nexmark/model/AuctionBid.java   |   3 +-
 .../nexmark/queries/AbstractSimulator.java      | 211 +++++++++++
 .../nexmark/queries/NexmarkQuery.java           | 270 +++++++++++++
 .../nexmark/queries/NexmarkQueryModel.java      | 123 ++++++
 .../integration/nexmark/queries/Query0.java     |   1 -
 .../nexmark/queries/Query0Model.java            |   4 +-
 .../integration/nexmark/queries/Query1.java     |   1 -
 .../integration/nexmark/queries/Query10.java    |   1 -
 .../integration/nexmark/queries/Query11.java    |   1 -
 .../integration/nexmark/queries/Query12.java    |   1 -
 .../nexmark/queries/Query1Model.java            |   2 -
 .../integration/nexmark/queries/Query2.java     |   1 -
 .../nexmark/queries/Query2Model.java            |   2 -
 .../integration/nexmark/queries/Query3.java     |   1 -
 .../nexmark/queries/Query3Model.java            |   2 -
 .../integration/nexmark/queries/Query4.java     |   2 -
 .../nexmark/queries/Query4Model.java            |   3 -
 .../integration/nexmark/queries/Query5.java     |   1 -
 .../nexmark/queries/Query5Model.java            |   2 -
 .../integration/nexmark/queries/Query6.java     |   2 -
 .../nexmark/queries/Query6Model.java            |   3 -
 .../integration/nexmark/queries/Query7.java     |   1 -
 .../nexmark/queries/Query7Model.java            |   2 -
 .../integration/nexmark/queries/Query8.java     |   1 -
 .../nexmark/queries/Query8Model.java            |   2 -
 .../integration/nexmark/queries/Query9.java     |   2 -
 .../nexmark/queries/Query9Model.java            |   3 -
 .../nexmark/queries/WinningBids.java            | 379 +++++++++++++++++++
 .../nexmark/queries/WinningBidsSimulator.java   | 207 ++++++++++
 .../integration/nexmark/queries/QueryTest.java  |   2 -
 36 files changed, 1194 insertions(+), 1225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
deleted file mode 100644
index b012842..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Abstract base class for simulator of a query.
- *
- * @param <InputT> Type of input elements.
- * @param <OutputT> Type of output elements.
- */
-public abstract class AbstractSimulator<InputT, OutputT> {
-  /** Window size for action bucket sampling. */
-  public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
-
-  /** Input event stream we should draw from. */
-  private final Iterator<TimestampedValue<InputT>> input;
-
-  /** Set to true when no more results. */
-  private boolean isDone;
-
-  /**
-   * Results which have not yet been returned by the {@link #results} iterator.
-   */
-  private final List<TimestampedValue<OutputT>> pendingResults;
-
-  /**
-   * Current window timestamp (ms since epoch).
-   */
-  private long currentWindow;
-
-  /**
-   * Number of (possibly intermediate) results for the current window.
-   */
-  private long currentCount;
-
-  /**
-   * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
-   * iterator.
-   */
-  private final List<Long> pendingCounts;
-
-  public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
-    this.input = input;
-    isDone = false;
-    pendingResults = new ArrayList<>();
-    currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
-    currentCount = 0;
-    pendingCounts = new ArrayList<>();
-  }
-
-  /** Called by implementors of {@link #run}: Fetch the next input element. */
-  @Nullable
-  protected TimestampedValue<InputT> nextInput() {
-    if (!input.hasNext()) {
-      return null;
-    }
-    TimestampedValue<InputT> timestampedInput = input.next();
-    NexmarkUtils.info("input: %s", timestampedInput);
-    return timestampedInput;
-  }
-
-  /**
-   * Called by implementors of {@link #run}:  Capture an intermediate result, for the purpose of
-   * recording the expected activity of the query over time.
-   */
-  protected void addIntermediateResult(TimestampedValue<OutputT> result) {
-    NexmarkUtils.info("intermediate result: %s", result);
-    updateCounts(result.getTimestamp());
-  }
-
-  /**
-   * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
-   * semantic correctness.
-   */
-  protected void addResult(TimestampedValue<OutputT> result) {
-    NexmarkUtils.info("result: %s", result);
-    pendingResults.add(result);
-    updateCounts(result.getTimestamp());
-  }
-
-  /**
-   * Update window and counts.
-   */
-  private void updateCounts(Instant timestamp) {
-    long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
-    if (window > currentWindow) {
-      if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
-        pendingCounts.add(currentCount);
-      }
-      currentCount = 0;
-      currentWindow = window;
-    }
-    currentCount++;
-  }
-
-  /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
-  protected void allDone() {
-    isDone = true;
-  }
-
-  /**
-   * Overridden by derived classes to do the next increment of work. Each call should
-   * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
-   * or {@link #allDone}. It is ok for a single call to emit more than one result via
-   * {@link #addResult}. It is ok for a single call to run the entire simulation, though
-   * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
-   * stall.
-   */
-  protected abstract void run();
-
-  /**
-   * Return iterator over all expected timestamped results. The underlying simulator state is
-   * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
-   */
-  public Iterator<TimestampedValue<OutputT>> results() {
-    return new Iterator<TimestampedValue<OutputT>>() {
-      @Override
-      public boolean hasNext() {
-        while (true) {
-          if (!pendingResults.isEmpty()) {
-            return true;
-          }
-          if (isDone) {
-            return false;
-          }
-          run();
-        }
-      }
-
-      @Override
-      public TimestampedValue<OutputT> next() {
-        TimestampedValue<OutputT> result = pendingResults.get(0);
-        pendingResults.remove(0);
-        return result;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  /**
-   * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
-   * simulator state is changed.  Only one of {@link #results} or {@link #resultsPerWindow} can be
-   * called.
-   */
-  public Iterator<Long> resultsPerWindow() {
-    return new Iterator<Long>() {
-      @Override
-      public boolean hasNext() {
-        while (true) {
-          if (!pendingCounts.isEmpty()) {
-            return true;
-          }
-          if (isDone) {
-            if (currentCount > 0) {
-              pendingCounts.add(currentCount);
-              currentCount = 0;
-              currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
-              return true;
-            } else {
-              return false;
-            }
-          }
-          run();
-        }
-      }
-
-      @Override
-      public Long next() {
-        Long result = pendingCounts.get(0);
-        pendingCounts.remove(0);
-        return result;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
deleted file mode 100644
index ab1c305..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
- * multiple queries.
- */
-public abstract class NexmarkQuery
-    extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
-  protected static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
-  protected static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
-  protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
-
-  /** Predicate to detect a new person event. */
-  protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
-      new SerializableFunction<Event, Boolean>() {
-        @Override
-        public Boolean apply(Event event) {
-          return event.newPerson != null;
-        }
-      };
-
-  /** DoFn to convert a new person event to a person. */
-  protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element().newPerson);
-    }
-  };
-
-  /** Predicate to detect a new auction event. */
-  protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
-      new SerializableFunction<Event, Boolean>() {
-        @Override
-        public Boolean apply(Event event) {
-          return event.newAuction != null;
-        }
-      };
-
-  /** DoFn to convert a new auction event to an auction. */
-  protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element().newAuction);
-    }
-  };
-
-  /** Predicate to detect a new bid event. */
-  protected static final SerializableFunction<Event, Boolean> IS_BID =
-      new SerializableFunction<Event, Boolean>() {
-        @Override
-        public Boolean apply(Event event) {
-          return event.bid != null;
-        }
-      };
-
-  /** DoFn to convert a bid event to a bid. */
-  protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element().bid);
-    }
-  };
-
-  /** Transform to key each person by their id. */
-  protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
-      ParDo.of(new DoFn<Person, KV<Long, Person>>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               c.output(KV.of(c.element().id, c.element()));
-             }
-           });
-
-  /** Transform to key each auction by its id. */
-  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
-      ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               c.output(KV.of(c.element().id, c.element()));
-             }
-           });
-
-  /** Transform to key each auction by its seller id. */
-  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
-      ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               c.output(KV.of(c.element().seller, c.element()));
-             }
-           });
-
-  /** Transform to key each bid by it's auction id. */
-  protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
-      ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               c.output(KV.of(c.element().auction, c.element()));
-             }
-           });
-
-  /** Transform to project the auction id from each bid. */
-  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
-      ParDo.of(new DoFn<Bid, Long>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               c.output(c.element().auction);
-             }
-           });
-
-  /** Transform to project the price from each bid. */
-  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
-      ParDo.of(new DoFn<Bid, Long>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               c.output(c.element().price);
-             }
-           });
-
-  /** Transform to emit each event with the timestamp embedded within it. */
-  public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
-      ParDo.of(new DoFn<Event, Event>() {
-             @ProcessElement
-             public void processElement(ProcessContext c) {
-               Event e = c.element();
-               if (e.bid != null) {
-                 c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
-               } else if (e.newPerson != null) {
-                 c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
-               } else if (e.newAuction != null) {
-                 c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
-               }
-             }
-           });
-
-  /**
-   * Transform to filter for just the new auction events.
-   */
-  protected static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
-      new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
-        @Override
-        public PCollection<Auction> expand(PCollection<Event> input) {
-          return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
-                      .apply("AsAuction", ParDo.of(AS_AUCTION));
-        }
-      };
-
-  /**
-   * Transform to filter for just the new person events.
-   */
-  protected static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
-      new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
-        @Override
-        public PCollection<Person> expand(PCollection<Event> input) {
-          return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
-                      .apply("AsPerson", ParDo.of(AS_PERSON));
-        }
-      };
-
-  /**
-   * Transform to filter for just the bid events.
-   */
-  protected static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
-      new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
-        @Override
-        public PCollection<Bid> expand(PCollection<Event> input) {
-          return input.apply("IsBid", Filter.by(IS_BID))
-                      .apply("AsBid", ParDo.of(AS_BID));
-        }
-      };
-
-  protected final NexmarkConfiguration configuration;
-  public final Monitor<Event> eventMonitor;
-  public final Monitor<KnownSize> resultMonitor;
-  public final Monitor<Event> endOfStreamMonitor;
-  protected final Counter fatalCounter;
-
-  protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
-    super(name);
-    this.configuration = configuration;
-    if (configuration.debug) {
-      eventMonitor = new Monitor<>(name + ".Events", "event");
-      resultMonitor = new Monitor<>(name + ".Results", "result");
-      endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
-      fatalCounter = Metrics.counter(name , "fatal");
-    } else {
-      eventMonitor = null;
-      resultMonitor = null;
-      endOfStreamMonitor = null;
-      fatalCounter = null;
-    }
-  }
-
-  /**
-   * Implement the actual query. All we know about the result is it has a known encoded size.
-   */
-  protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
-
-  @Override
-  public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) {
-
-    if (configuration.debug) {
-      events =
-          events
-              // Monitor events as they go by.
-              .apply(name + ".Monitor", eventMonitor.getTransform())
-              // Count each type of event.
-              .apply(name + ".Snoop", NexmarkUtils.snoop(name));
-    }
-
-    if (configuration.cpuDelayMs > 0) {
-      // Slow down by pegging one core at 100%.
-      events = events.apply(name + ".CpuDelay",
-              NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
-    }
-
-    if (configuration.diskBusyBytes > 0) {
-      // Slow down by forcing bytes to durable store.
-      events = events.apply(name + ".DiskBusy",
-              NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
-    }
-
-    // Run the query.
-    PCollection<KnownSize> queryResults = applyPrim(events);
-
-    if (configuration.debug) {
-      // Monitor results as they go by.
-      queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform());
-    }
-
-    // Timestamp the query results.
-    return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
deleted file mode 100644
index b2b1826..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-
-import org.hamcrest.core.IsEqual;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
- * applied against the actual query results to check their consistency with the model.
- */
-public abstract class NexmarkQueryModel implements Serializable {
-  protected final NexmarkConfiguration configuration;
-
-  public NexmarkQueryModel(NexmarkConfiguration configuration) {
-    this.configuration = configuration;
-  }
-
-  /**
-   * Return the start of the most recent window of {@code size} and {@code period} which ends
-   * strictly before {@code timestamp}.
-   */
-  public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
-    long ts = timestamp.getMillis();
-    long p = period.getMillis();
-    long lim = ts - ts % p;
-    long s = size.getMillis();
-    return new Instant(lim - s);
-  }
-
-  /** Convert {@code itr} to strings capturing values, timestamps and order. */
-  protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
-    List<String> strings = new ArrayList<>();
-    while (itr.hasNext()) {
-      strings.add(itr.next().toString());
-    }
-    return strings;
-  }
-
-  /** Convert {@code itr} to strings capturing values and order. */
-  protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
-    List<String> strings = new ArrayList<>();
-    while (itr.hasNext()) {
-      strings.add(itr.next().getValue().toString());
-    }
-    return strings;
-  }
-
-  /** Convert {@code itr} to strings capturing values only. */
-  protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
-    Set<String> strings = new HashSet<>();
-    while (itr.hasNext()) {
-      strings.add(itr.next().getValue().toString());
-    }
-    return strings;
-  }
-
-  /** Return simulator for query. */
-  protected abstract AbstractSimulator<?, ?> simulator();
-
-  /** Return sub-sequence of results which are significant for model. */
-  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
-      Iterable<TimestampedValue<KnownSize>> results) {
-    return results;
-  }
-
-  /**
-   * Convert iterator of elements to collection of strings to use when testing coherence of model
-   * against actual query results.
-   */
-  protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
-
-  /** Return assertion to use on results of pipeline for this query. */
-  public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
-    final Collection<String> expectedStrings = toCollection(simulator().results());
-    final String[] expectedStringsArray =
-      expectedStrings.toArray(new String[expectedStrings.size()]);
-
-    return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
-      @Override
-      public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
-      Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
-        Assert.assertThat("wrong pipeline output", actualStrings,
-          IsEqual.equalTo(expectedStrings));
-//compare without order
-//      Assert.assertThat("wrong pipeline output", actualStrings,
-//        IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
-        return null;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index ebfd196..a3c4d33 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -38,6 +38,8 @@ import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.queries.NexmarkQuery;
+import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.queries.Query0;
 import org.apache.beam.integration.nexmark.queries.Query0Model;
 import org.apache.beam.integration.nexmark.queries.Query1;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
deleted file mode 100644
index 3815b9d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * A transform to find the winning bid for each closed auction. In pseudo CQL syntax:
- *
- * <pre>{@code
- * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
- * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- * GROUP BY A.id
- * }</pre>
- *
- * <p>We will also check that the winning bid is above the auction reserve. Note that
- * we ignore the auction opening bid value since it has no impact on which bid eventually wins,
- * if any.
- *
- * <p>Our implementation will use a custom windowing function in order to bring bids and
- * auctions together without requiring global state.
- */
-public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
-  /** Windows for open auctions and bids. */
-  private static class AuctionOrBidWindow extends IntervalWindow implements Serializable {
-    /** Id of auction this window is for. */
-    public final long auction;
-
-    /**
-     * True if this window represents an actual auction, and thus has a start/end
-     * time matching that of the auction. False if this window represents a bid, and
-     * thus has an unbounded start/end time.
-     */
-    public final boolean isAuctionWindow;
-
-    /** For avro only. */
-    private AuctionOrBidWindow() {
-      super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
-      auction = 0;
-      isAuctionWindow = false;
-    }
-
-    private AuctionOrBidWindow(
-        Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
-      super(start, end);
-      this.auction = auctionId;
-      this.isAuctionWindow = isAuctionWindow;
-    }
-
-    /** Return an auction window for {@code auction}. */
-    public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
-      AuctionOrBidWindow result =
-          new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
-      return result;
-    }
-
-    /**
-     * Return a bid window for {@code bid}. It should later be merged into
-     * the corresponding auction window. However, it is possible this bid is for an already
-     * expired auction, or for an auction which the system has not yet seen. So we
-     * give the bid a bit of wiggle room in its interval.
-     */
-    public static AuctionOrBidWindow forBid(
-        long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
-      // At this point we don't know which auctions are still valid, and the bid may
-      // be for an auction which won't start until some unknown time in the future
-      // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
-      // A real system would atomically reconcile bids and auctions by a separate mechanism.
-      // If we give bids an unbounded window it is possible a bid for an auction which
-      // has already expired would cause the system watermark to stall, since that window
-      // would never be retired.
-      // Instead, we will just give the bid a finite window which expires at
-      // the upper bound of auctions assuming the auction starts at the same time as the bid,
-      // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
-      AuctionOrBidWindow result = new AuctionOrBidWindow(
-          timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
-      return result;
-    }
-
-    /** Is this an auction window? */
-    public boolean isAuctionWindow() {
-      return isAuctionWindow;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
-          start(), end(), auction, isAuctionWindow);
-    }
-  }
-
-  /**
-   * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
-   */
-  private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
-    private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
-    private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
-    private static final Coder<Long> ID_CODER = VarLongCoder.of();
-    private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
-    @JsonCreator
-    public static AuctionOrBidWindowCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
-        throws IOException, CoderException {
-      SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
-      ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
-      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
-    }
-
-    @Override
-    public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
-        throws IOException, CoderException {
-      IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
-      long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
-      boolean isAuctionWindow =
-          INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
-      return new AuctionOrBidWindow(
-          superWindow.start(), superWindow.end(), auction, isAuctionWindow);
-    }
-
-    @Override public void verifyDeterministic() throws NonDeterministicException {}
-  }
-
-  /** Assign events to auction windows and merges them intelligently. */
-  private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
-    /** Expected duration of auctions in ms. */
-    private final long expectedAuctionDurationMs;
-
-    public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
-      this.expectedAuctionDurationMs = expectedAuctionDurationMs;
-    }
-
-    @Override
-    public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
-      Event event = c.element();
-      if (event.newAuction != null) {
-        // Assign auctions to an auction window which expires at the auction's close.
-        return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
-      } else if (event.bid != null) {
-        // Assign bids to a temporary bid window which will later be merged into the appropriate
-        // auction window.
-        return Arrays.asList(
-            AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
-      } else {
-        // Don't assign people to any window. They will thus be dropped.
-        return Arrays.asList();
-      }
-    }
-
-    @Override
-    public void mergeWindows(MergeContext c) throws Exception {
-      // Split and index the auction and bid windows by auction id.
-      Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
-      Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>();
-      for (AuctionOrBidWindow window : c.windows()) {
-        if (window.isAuctionWindow()) {
-          idToTrueAuctionWindow.put(window.auction, window);
-        } else {
-          List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction);
-          if (bidWindows == null) {
-            bidWindows = new ArrayList<>();
-            idToBidAuctionWindows.put(window.auction, bidWindows);
-          }
-          bidWindows.add(window);
-        }
-      }
-
-      // Merge all 'bid' windows into their corresponding 'auction' window, provided the
-      // auction has not expired.
-      for (long auction : idToTrueAuctionWindow.keySet()) {
-        AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
-        List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
-        if (bidWindows != null) {
-          List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
-          for (AuctionOrBidWindow bidWindow : bidWindows) {
-            if (bidWindow.start().isBefore(auctionWindow.end())) {
-              toBeMerged.add(bidWindow);
-            }
-            // else: This bid window will remain until its expire time, at which point it
-            // will expire without ever contributing to an output.
-          }
-          if (!toBeMerged.isEmpty()) {
-            toBeMerged.add(auctionWindow);
-            c.merge(toBeMerged, auctionWindow);
-          }
-        }
-      }
-    }
-
-    @Override
-    public boolean isCompatible(WindowFn<?, ?> other) {
-      return other instanceof AuctionOrBidWindowFn;
-    }
-
-    @Override
-    public Coder<AuctionOrBidWindow> windowCoder() {
-      return AuctionOrBidWindowCoder.of();
-    }
-
-    @Override
-    public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
-      throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
-    }
-
-    /**
-     * Below we will GBK auctions and bids on their auction ids. Then we will reduce those
-     * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at
-     * least one valid bid. We would like those output pairs to have a timestamp of the auction's
-     * expiry (since that's the earliest we know for sure we have the correct winner). We would
-     * also like to make that winning results are available to following stages at the auction's
-     * expiry.
-     *
-     * <p>Each result of the GBK will have a timestamp of the min of the result of this object's
-     * assignOutputTime over all records which end up in one of its iterables. Thus we get the
-     * desired behavior if we ignore each record's timestamp and always return the auction window's
-     * 'maxTimestamp', which will correspond to the auction's expiry.
-     *
-     * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp'
-     * (the usual implementation), then each GBK record will take as its timestamp the minimum of
-     * the timestamps of all bids and auctions within it, which will always be the auction's
-     * timestamp. An auction which expires well into the future would thus hold up the watermark
-     * of the GBK results until that auction expired. That in turn would hold up all winning pairs.
-     */
-    @Override
-    public Instant getOutputTime(
-        Instant inputTimestamp, AuctionOrBidWindow window) {
-      return window.maxTimestamp();
-    }
-  }
-
-  private final AuctionOrBidWindowFn auctionOrBidWindowFn;
-
-  public WinningBids(String name, NexmarkConfiguration configuration) {
-    super(name);
-    // What's the expected auction time (when the system is running at the lowest event rate).
-    long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
-        configuration.firstEventRate, configuration.nextEventRate,
-        configuration.rateUnit, configuration.numEventGenerators);
-    long longestDelayUs = 0;
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
-    }
-    // Adjust for proportion of auction events amongst all events.
-    longestDelayUs =
-        (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
-        / GeneratorConfig.AUCTION_PROPORTION;
-    // Adjust for number of in-flight auctions.
-    longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
-    long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
-    NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
-    auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
-  }
-
-  @Override
-  public PCollection<AuctionBid> expand(PCollection<Event> events) {
-    // Window auctions and bids into custom auction windows. New people events will be discarded.
-    // This will allow us to bring bids and auctions together irrespective of how long
-    // each auction is open for.
-    events = events.apply("Window", Window.into(auctionOrBidWindowFn));
-
-    // Key auctions by their id.
-    PCollection<KV<Long, Auction>> auctionsById =
-        events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
-              .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
-
-    // Key bids by their auction id.
-    PCollection<KV<Long, Bid>> bidsByAuctionId =
-        events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
-
-    // Find the highest price valid bid for each closed auction.
-    return
-      // Join auctions and bids.
-      KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
-        .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
-        .apply(CoGroupByKey.<Long>create())
-        // Filter and select.
-        .apply(name + ".Join",
-          ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
-            private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
-            private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
-            private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
-
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              Auction auction =
-                  c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
-              if (auction == null) {
-                // We have bids without a matching auction. Give up.
-                noAuctionCounter.inc();
-                return;
-              }
-              // Find the current winning bid for auction.
-              // The earliest bid with the maximum price above the reserve wins.
-              Bid bestBid = null;
-              for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
-                // Bids too late for their auction will have been
-                // filtered out by the window merge function.
-                checkState(bid.dateTime < auction.expires);
-                if (bid.price < auction.reserve) {
-                  // Bid price is below auction reserve.
-                  underReserveCounter.inc();
-                  continue;
-                }
-
-                if (bestBid == null
-                    || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
-                  bestBid = bid;
-                }
-              }
-              if (bestBid == null) {
-                // We don't have any valid bids for auction.
-                noValidBidsCounter.inc();
-                return;
-              }
-              c.output(new AuctionBid(auction, bestBid));
-            }
-          }
-        ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
deleted file mode 100644
index e7f51b7..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A simulator of the {@code WinningBids} query.
- */
-public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
-  /** Auctions currently still open, indexed by auction id. */
-  private final Map<Long, Auction> openAuctions;
-
-  /** The ids of auctions known to be closed. */
-  private final Set<Long> closedAuctions;
-
-  /** Current best valid bids for open auctions, indexed by auction id. */
-  private final Map<Long, Bid> bestBids;
-
-  /** Bids for auctions we havn't seen yet. */
-  private final List<Bid> bidsWithoutAuctions;
-
-  /**
-   * Timestamp of last new auction or bid event (ms since epoch).
-   */
-  private long lastTimestamp;
-
-  public WinningBidsSimulator(NexmarkConfiguration configuration) {
-    super(NexmarkUtils.standardEventIterator(configuration));
-    openAuctions = new TreeMap<>();
-    closedAuctions = new TreeSet<>();
-    bestBids = new TreeMap<>();
-    bidsWithoutAuctions = new ArrayList<>();
-    lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
-  }
-
-  /**
-   * Try to account for {@code bid} in state. Return true if bid has now been
-   * accounted for by {@code bestBids}.
-   */
-  private boolean captureBestBid(Bid bid, boolean shouldLog) {
-    if (closedAuctions.contains(bid.auction)) {
-      // Ignore bids for known, closed auctions.
-      if (shouldLog) {
-        NexmarkUtils.info("closed auction: %s", bid);
-      }
-      return true;
-    }
-    Auction auction = openAuctions.get(bid.auction);
-    if (auction == null) {
-      // We don't have an auction for this bid yet, so can't determine if it is
-      // winning or not.
-      if (shouldLog) {
-        NexmarkUtils.info("pending auction: %s", bid);
-      }
-      return false;
-    }
-    if (bid.price < auction.reserve) {
-      // Bid price is too low.
-      if (shouldLog) {
-        NexmarkUtils.info("below reserve: %s", bid);
-      }
-      return true;
-    }
-    Bid existingBid = bestBids.get(bid.auction);
-    if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
-      // We've found a (new) best bid for a known auction.
-      bestBids.put(bid.auction, bid);
-      if (shouldLog) {
-        NexmarkUtils.info("new winning bid: %s", bid);
-      }
-    } else {
-      if (shouldLog) {
-        NexmarkUtils.info("ignoring low bid: %s", bid);
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Try to match bids without auctions to auctions.
-   */
-  private void flushBidsWithoutAuctions() {
-    Iterator<Bid> itr = bidsWithoutAuctions.iterator();
-    while (itr.hasNext()) {
-      Bid bid = itr.next();
-      if (captureBestBid(bid, false)) {
-        NexmarkUtils.info("bid now accounted for: %s", bid);
-        itr.remove();
-      }
-    }
-  }
-
-  /**
-   * Return the next winning bid for an expired auction relative to {@code timestamp}.
-   * Return null if no more winning bids, in which case all expired auctions will
-   * have been removed from our state. Retire auctions in order of expire time.
-   */
-  @Nullable
-  private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
-    Map<Long, List<Long>> toBeRetired = new TreeMap<>();
-    for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
-      if (entry.getValue().expires <= timestamp) {
-        List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
-        if (idsAtTime == null) {
-          idsAtTime = new ArrayList<>();
-          toBeRetired.put(entry.getValue().expires, idsAtTime);
-        }
-        idsAtTime.add(entry.getKey());
-      }
-    }
-    for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
-      for (long id : entry.getValue()) {
-        Auction auction = openAuctions.get(id);
-        NexmarkUtils.info("retiring auction: %s", auction);
-        openAuctions.remove(id);
-        Bid bestBid = bestBids.get(id);
-        if (bestBid != null) {
-          TimestampedValue<AuctionBid> result =
-              TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires));
-          NexmarkUtils.info("winning: %s", result);
-          return result;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  protected void run() {
-    if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
-      // We may have finally seen the auction a bid was intended for.
-      flushBidsWithoutAuctions();
-      TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
-      if (result != null) {
-        addResult(result);
-        return;
-      }
-    }
-
-    TimestampedValue<Event> timestampedEvent = nextInput();
-    if (timestampedEvent == null) {
-      // No more events. Flush any still open auctions.
-      TimestampedValue<AuctionBid> result =
-          nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
-      if (result == null) {
-        // We are done.
-        allDone();
-        return;
-      }
-      addResult(result);
-      //TODO test fails because offset of some hundreds of ms beween expect and actual
-      return;
-    }
-
-    Event event = timestampedEvent.getValue();
-    if (event.newPerson != null) {
-      // Ignore new person events.
-      return;
-    }
-
-    lastTimestamp = timestampedEvent.getTimestamp().getMillis();
-    if (event.newAuction != null) {
-      // Add this new open auction to our state.
-      openAuctions.put(event.newAuction.id, event.newAuction);
-    } else {
-      if (!captureBestBid(event.bid, true)) {
-        // We don't know what to do with this bid yet.
-        NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
-        bidsWithoutAuctions.add(event.bid);
-      }
-    }
-    // Keep looking for winning bids.
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
index 7f6b7c9..b1d9ec2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -24,13 +24,12 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
- * Result of {@link WinningBids} transform.
+ * Result of {@link org.apache.beam.integration.nexmark.queries.WinningBids} transform.
  */
 public class AuctionBid implements KnownSize, Serializable {
   public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
new file mode 100644
index 0000000..270b5c3
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Abstract base class for simulator of a query.
+ *
+ * @param <InputT> Type of input elements.
+ * @param <OutputT> Type of output elements.
+ */
+public abstract class AbstractSimulator<InputT, OutputT> {
+  /** Window size for action bucket sampling. */
+  public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+
+  /** Input event stream we should draw from. */
+  private final Iterator<TimestampedValue<InputT>> input;
+
+  /** Set to true when no more results. */
+  private boolean isDone;
+
+  /**
+   * Results which have not yet been returned by the {@link #results} iterator.
+   */
+  private final List<TimestampedValue<OutputT>> pendingResults;
+
+  /**
+   * Current window timestamp (ms since epoch).
+   */
+  private long currentWindow;
+
+  /**
+   * Number of (possibly intermediate) results for the current window.
+   */
+  private long currentCount;
+
+  /**
+   * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
+   * iterator.
+   */
+  private final List<Long> pendingCounts;
+
+  public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
+    this.input = input;
+    isDone = false;
+    pendingResults = new ArrayList<>();
+    currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentCount = 0;
+    pendingCounts = new ArrayList<>();
+  }
+
+  /** Called by implementors of {@link #run}: Fetch the next input element. */
+  @Nullable
+  protected TimestampedValue<InputT> nextInput() {
+    if (!input.hasNext()) {
+      return null;
+    }
+    TimestampedValue<InputT> timestampedInput = input.next();
+    NexmarkUtils.info("input: %s", timestampedInput);
+    return timestampedInput;
+  }
+
+  /**
+   * Called by implementors of {@link #run}:  Capture an intermediate result, for the purpose of
+   * recording the expected activity of the query over time.
+   */
+  protected void addIntermediateResult(TimestampedValue<OutputT> result) {
+    NexmarkUtils.info("intermediate result: %s", result);
+    updateCounts(result.getTimestamp());
+  }
+
+  /**
+   * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
+   * semantic correctness.
+   */
+  protected void addResult(TimestampedValue<OutputT> result) {
+    NexmarkUtils.info("result: %s", result);
+    pendingResults.add(result);
+    updateCounts(result.getTimestamp());
+  }
+
+  /**
+   * Update window and counts.
+   */
+  private void updateCounts(Instant timestamp) {
+    long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
+    if (window > currentWindow) {
+      if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+        pendingCounts.add(currentCount);
+      }
+      currentCount = 0;
+      currentWindow = window;
+    }
+    currentCount++;
+  }
+
+  /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
+  protected void allDone() {
+    isDone = true;
+  }
+
+  /**
+   * Overridden by derived classes to do the next increment of work. Each call should
+   * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
+   * or {@link #allDone}. It is ok for a single call to emit more than one result via
+   * {@link #addResult}. It is ok for a single call to run the entire simulation, though
+   * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
+   * stall.
+   */
+  protected abstract void run();
+
+  /**
+   * Return iterator over all expected timestamped results. The underlying simulator state is
+   * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
+   */
+  public Iterator<TimestampedValue<OutputT>> results() {
+    return new Iterator<TimestampedValue<OutputT>>() {
+      @Override
+      public boolean hasNext() {
+        while (true) {
+          if (!pendingResults.isEmpty()) {
+            return true;
+          }
+          if (isDone) {
+            return false;
+          }
+          run();
+        }
+      }
+
+      @Override
+      public TimestampedValue<OutputT> next() {
+        TimestampedValue<OutputT> result = pendingResults.get(0);
+        pendingResults.remove(0);
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
+   * simulator state is changed.  Only one of {@link #results} or {@link #resultsPerWindow} can be
+   * called.
+   */
+  public Iterator<Long> resultsPerWindow() {
+    return new Iterator<Long>() {
+      @Override
+      public boolean hasNext() {
+        while (true) {
+          if (!pendingCounts.isEmpty()) {
+            return true;
+          }
+          if (isDone) {
+            if (currentCount > 0) {
+              pendingCounts.add(currentCount);
+              currentCount = 0;
+              currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+              return true;
+            } else {
+              return false;
+            }
+          }
+          run();
+        }
+      }
+
+      @Override
+      public Long next() {
+        Long result = pendingCounts.get(0);
+        pendingCounts.remove(0);
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
new file mode 100644
index 0000000..0796ce5
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.Monitor;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
+ * multiple queries.
+ */
+public abstract class NexmarkQuery
+    extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
+  public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
+  public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
+  protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+
+  /** Predicate to detect a new person event. */
+  protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.newPerson != null;
+        }
+      };
+
+  /** DoFn to convert a new person event to a person. */
+  protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().newPerson);
+    }
+  };
+
+  /** Predicate to detect a new auction event. */
+  protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.newAuction != null;
+        }
+      };
+
+  /** DoFn to convert a new auction event to an auction. */
+  protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().newAuction);
+    }
+  };
+
+  /** Predicate to detect a new bid event. */
+  protected static final SerializableFunction<Event, Boolean> IS_BID =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.bid != null;
+        }
+      };
+
+  /** DoFn to convert a bid event to a bid. */
+  protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().bid);
+    }
+  };
+
+  /** Transform to key each person by their id. */
+  protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
+      ParDo.of(new DoFn<Person, KV<Long, Person>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().id, c.element()));
+             }
+           });
+
+  /** Transform to key each auction by its id. */
+  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+      ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().id, c.element()));
+             }
+           });
+
+  /** Transform to key each auction by its seller id. */
+  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+      ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().seller, c.element()));
+             }
+           });
+
+  /** Transform to key each bid by it's auction id. */
+  protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+      ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().auction, c.element()));
+             }
+           });
+
+  /** Transform to project the auction id from each bid. */
+  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
+      ParDo.of(new DoFn<Bid, Long>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(c.element().auction);
+             }
+           });
+
+  /** Transform to project the price from each bid. */
+  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
+      ParDo.of(new DoFn<Bid, Long>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(c.element().price);
+             }
+           });
+
+  /** Transform to emit each event with the timestamp embedded within it. */
+  public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
+      ParDo.of(new DoFn<Event, Event>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               Event e = c.element();
+               if (e.bid != null) {
+                 c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
+               } else if (e.newPerson != null) {
+                 c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
+               } else if (e.newAuction != null) {
+                 c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
+               }
+             }
+           });
+
+  /**
+   * Transform to filter for just the new auction events.
+   */
+  public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
+      new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
+        @Override
+        public PCollection<Auction> expand(PCollection<Event> input) {
+          return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
+                      .apply("AsAuction", ParDo.of(AS_AUCTION));
+        }
+      };
+
+  /**
+   * Transform to filter for just the new person events.
+   */
+  public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
+      new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
+        @Override
+        public PCollection<Person> expand(PCollection<Event> input) {
+          return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
+                      .apply("AsPerson", ParDo.of(AS_PERSON));
+        }
+      };
+
+  /**
+   * Transform to filter for just the bid events.
+   */
+  public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
+      new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
+        @Override
+        public PCollection<Bid> expand(PCollection<Event> input) {
+          return input.apply("IsBid", Filter.by(IS_BID))
+                      .apply("AsBid", ParDo.of(AS_BID));
+        }
+      };
+
+  protected final NexmarkConfiguration configuration;
+  public final Monitor<Event> eventMonitor;
+  public final Monitor<KnownSize> resultMonitor;
+  public final Monitor<Event> endOfStreamMonitor;
+  protected final Counter fatalCounter;
+
+  protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
+    super(name);
+    this.configuration = configuration;
+    if (configuration.debug) {
+      eventMonitor = new Monitor<>(name + ".Events", "event");
+      resultMonitor = new Monitor<>(name + ".Results", "result");
+      endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
+      fatalCounter = Metrics.counter(name , "fatal");
+    } else {
+      eventMonitor = null;
+      resultMonitor = null;
+      endOfStreamMonitor = null;
+      fatalCounter = null;
+    }
+  }
+
+  /**
+   * Implement the actual query. All we know about the result is it has a known encoded size.
+   */
+  protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
+
+  @Override
+  public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) {
+
+    if (configuration.debug) {
+      events =
+          events
+              // Monitor events as they go by.
+              .apply(name + ".Monitor", eventMonitor.getTransform())
+              // Count each type of event.
+              .apply(name + ".Snoop", NexmarkUtils.snoop(name));
+    }
+
+    if (configuration.cpuDelayMs > 0) {
+      // Slow down by pegging one core at 100%.
+      events = events.apply(name + ".CpuDelay",
+              NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
+    }
+
+    if (configuration.diskBusyBytes > 0) {
+      // Slow down by forcing bytes to durable store.
+      events = events.apply(name + ".DiskBusy",
+              NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
+    }
+
+    // Run the query.
+    PCollection<KnownSize> queryResults = applyPrim(events);
+
+    if (configuration.debug) {
+      // Monitor results as they go by.
+      queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform());
+    }
+
+    // Timestamp the query results.
+    return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
new file mode 100644
index 0000000..1ad9099
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+
+import org.hamcrest.core.IsEqual;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+/**
+ * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
+ * applied against the actual query results to check their consistency with the model.
+ */
+public abstract class NexmarkQueryModel implements Serializable {
+  public final NexmarkConfiguration configuration;
+
+  public NexmarkQueryModel(NexmarkConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  /**
+   * Return the start of the most recent window of {@code size} and {@code period} which ends
+   * strictly before {@code timestamp}.
+   */
+  public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+    long ts = timestamp.getMillis();
+    long p = period.getMillis();
+    long lim = ts - ts % p;
+    long s = size.getMillis();
+    return new Instant(lim - s);
+  }
+
+  /** Convert {@code itr} to strings capturing values, timestamps and order. */
+  protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
+    List<String> strings = new ArrayList<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().toString());
+    }
+    return strings;
+  }
+
+  /** Convert {@code itr} to strings capturing values and order. */
+  protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
+    List<String> strings = new ArrayList<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().getValue().toString());
+    }
+    return strings;
+  }
+
+  /** Convert {@code itr} to strings capturing values only. */
+  protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
+    Set<String> strings = new HashSet<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().getValue().toString());
+    }
+    return strings;
+  }
+
+  /** Return simulator for query. */
+  public abstract AbstractSimulator<?, ?> simulator();
+
+  /** Return sub-sequence of results which are significant for model. */
+  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+      Iterable<TimestampedValue<KnownSize>> results) {
+    return results;
+  }
+
+  /**
+   * Convert iterator of elements to collection of strings to use when testing coherence of model
+   * against actual query results.
+   */
+  protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
+
+  /** Return assertion to use on results of pipeline for this query. */
+  public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
+    final Collection<String> expectedStrings = toCollection(simulator().results());
+    final String[] expectedStringsArray =
+      expectedStrings.toArray(new String[expectedStrings.size()]);
+
+    return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
+      @Override
+      public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
+      Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
+        Assert.assertThat("wrong pipeline output", actualStrings,
+          IsEqual.equalTo(expectedStrings));
+//compare without order
+//      Assert.assertThat("wrong pipeline output", actualStrings,
+//        IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
index 84696c4..00a49a8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
@@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
index 991b1d4..6fb6613 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -20,9 +20,7 @@ package org.apache.beam.integration.nexmark.queries;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -56,7 +54,7 @@ public class Query0Model extends NexmarkQueryModel {
   }
 
   @Override
-  protected AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, ?> simulator() {
     return new Simulator(configuration);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
index 0be77ce..8d90b70 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index d9b3557..c919691 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -28,7 +28,6 @@ import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Done;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
index a8a61ae..fd936a9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.BidsPerSession;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
index a5db504..20f45fb 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.BidsPerSession;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
index 58037d3..0388687 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -21,9 +21,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
index 4c8f878..a365b97 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.AuctionPrice;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
index f578e4c..e00992f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
@@ -21,9 +21,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.AuctionPrice;
 import org.apache.beam.integration.nexmark.model.Bid;