You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:13 UTC

[05/55] [abbrv] beam git commit: NexMark

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java
new file mode 100644
index 0000000..98f4f00
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.values.TimestampedValue;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
+ * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
+ * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
+ *
+ * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
+ * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
+ * (in microseconds). The event stream is thus fully deterministic and does not depend on
+ * wallclock time.
+ *
+ * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
+ * so that we can resume generating events from a saved snapshot.
+ */
+public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
+  /**
+   * Keep the number of categories small so the example queries will find results even with
+   * a small batch of events.
+   */
+  private static final int NUM_CATEGORIES = 5;
+
+  /** Smallest random string size. */
+  private static final int MIN_STRING_LENGTH = 3;
+
+  /**
+   * Keep the number of states small so that the example queries will find results even with
+   * a small batch of events.
+   */
+  private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+  private static final List<String> US_CITIES =
+      Arrays.asList(
+          ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+              .split(","));
+
+  private static final List<String> FIRST_NAMES =
+      Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+  private static final List<String> LAST_NAMES =
+      Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+  /**
+   * Number of yet-to-be-created people and auction ids allowed.
+   */
+  private static final int PERSON_ID_LEAD = 10;
+  private static final int AUCTION_ID_LEAD = 10;
+
+  /**
+   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+   * over these values.
+   */
+  private static final int HOT_AUCTION_RATIO = 100;
+  private static final int HOT_SELLER_RATIO = 100;
+  private static final int HOT_BIDDER_RATIO = 100;
+
+  /**
+   * Just enough state to be able to restore a generator back to where it was checkpointed.
+   */
+  public static class Checkpoint implements UnboundedSource.CheckpointMark {
+    private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+    /** Coder for this class. */
+    public static final Coder<Checkpoint> CODER_INSTANCE =
+        new AtomicCoder<Checkpoint>() {
+          @Override
+          public void encode(
+              Checkpoint value,
+              OutputStream outStream,
+              Coder.Context context)
+              throws CoderException, IOException {
+            LONG_CODER.encode(value.numEvents, outStream, Context.NESTED);
+            LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED);
+          }
+
+          @Override
+          public Checkpoint decode(
+              InputStream inStream, Coder.Context context)
+              throws CoderException, IOException {
+            long numEvents = LONG_CODER.decode(inStream, Context.NESTED);
+            long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
+            return new Checkpoint(numEvents, wallclockBaseTime);
+          }
+        };
+
+    private long numEvents;
+    private long wallclockBaseTime;
+
+    private Checkpoint(long numEvents, long wallclockBaseTime) {
+      this.numEvents = numEvents;
+      this.wallclockBaseTime = wallclockBaseTime;
+    }
+
+    public Generator toGenerator(GeneratorConfig config) {
+      return new Generator(config, numEvents, wallclockBaseTime);
+    }
+
+    @Override
+    public void finalizeCheckpoint() throws IOException {
+      // Nothing to finalize.
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
+          numEvents, wallclockBaseTime);
+    }
+  }
+
+  /**
+   * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
+   * (arbitrary but stable) event hash order.
+   */
+  public static class NextEvent implements Comparable<NextEvent> {
+    /** When, in wallclock time, should this event be emitted? */
+    public final long wallclockTimestamp;
+
+    /** When, in event time, should this event be considered to have occured? */
+    public final long eventTimestamp;
+
+    /** The event itself. */
+    public final Event event;
+
+    /** The minimum of this and all future event timestamps. */
+    public final long watermark;
+
+    public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
+      this.wallclockTimestamp = wallclockTimestamp;
+      this.eventTimestamp = eventTimestamp;
+      this.event = event;
+      this.watermark = watermark;
+    }
+
+    /**
+     * Return a deep clone of next event with delay added to wallclock timestamp and
+     * event annotate as 'LATE'.
+     */
+    public NextEvent withDelay(long delayMs) {
+      return new NextEvent(
+          wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
+    }
+
+    @Override
+    public int compareTo(NextEvent other) {
+      int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
+      if (i != 0) {
+        return i;
+      }
+      return Integer.compare(event.hashCode(), other.event.hashCode());
+    }
+  }
+
+  /**
+   * Configuration to generate events against. Note that it may be replaced by a call to
+   * {@link #splitAtEventId}.
+   */
+  private GeneratorConfig config;
+
+  /** Number of events generated by this generator. */
+  private long numEvents;
+
+  /**
+   * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
+   */
+  private long wallclockBaseTime;
+
+  private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
+    Preconditions.checkNotNull(config);
+    this.config = config;
+    this.numEvents = numEvents;
+    this.wallclockBaseTime = wallclockBaseTime;
+  }
+
+  /**
+   * Create a fresh generator according to {@code config}.
+   */
+  public Generator(GeneratorConfig config) {
+    this(config, 0, -1);
+  }
+
+  /**
+   * Return a checkpoint for the current generator.
+   */
+  public Checkpoint toCheckpoint() {
+    return new Checkpoint(numEvents, wallclockBaseTime);
+  }
+
+  /**
+   * Return a deep clone of this generator.
+   */
+  @Override
+  public Generator clone() {
+    return new Generator(config.clone(), numEvents, wallclockBaseTime);
+  }
+
+  /**
+   * Return the current config for this generator. Note that configs may be replaced by {@link
+   * #splitAtEventId}.
+   */
+  public GeneratorConfig getCurrentConfig() {
+    return config;
+  }
+
+  /**
+   * Mutate this generator so that it will only generate events up to but not including
+   * {@code eventId}. Return a config to represent the events this generator will no longer yield.
+   * The generators will run in on a serial timeline.
+   */
+  public GeneratorConfig splitAtEventId(long eventId) {
+    long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
+    GeneratorConfig remainConfig = config.cloneWith(config.firstEventId,
+        config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
+    config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+    return remainConfig;
+  }
+
+  /**
+   * Return the next 'event id'. Though events don't have ids we can simulate them to
+   * help with bookkeeping.
+   */
+  public long getNextEventId() {
+    return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
+  }
+
+  /**
+   * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+   * due to generate a person.
+   */
+  private long lastBase0PersonId() {
+    long eventId = getNextEventId();
+    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+    if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+      // About to generate an auction or bid.
+      // Go back to the last person generated in this epoch.
+      offset = GeneratorConfig.PERSON_PROPORTION - 1;
+    }
+    // About to generate a person.
+    return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+  }
+
+  /**
+   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+   * due to generate an auction.
+   */
+  private long lastBase0AuctionId() {
+    long eventId = getNextEventId();
+    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+    if (offset < GeneratorConfig.PERSON_PROPORTION) {
+      // About to generate a person.
+      // Go back to the last auction in the last epoch.
+      epoch--;
+      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+      // About to generate a bid.
+      // Go back to the last auction generated in this epoch.
+      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+    } else {
+      // About to generate an auction.
+      offset -= GeneratorConfig.PERSON_PROPORTION;
+    }
+    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+  }
+
+  /** return a random US state. */
+  private static String nextUSState(Random random) {
+    return US_STATES.get(random.nextInt(US_STATES.size()));
+  }
+
+  /** Return a random US city. */
+  private static String nextUSCity(Random random) {
+    return US_CITIES.get(random.nextInt(US_CITIES.size()));
+  }
+
+  /** Return a random person name. */
+  private static String nextPersonName(Random random) {
+    return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+        + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+  }
+
+  /** Return a random string of up to {@code maxLength}. */
+  private static String nextString(Random random, int maxLength) {
+    int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+    StringBuilder sb = new StringBuilder();
+    while (len-- > 0) {
+      if (random.nextInt(13) == 0) {
+        sb.append(' ');
+      } else {
+        sb.append((char) ('a' + random.nextInt(26)));
+      }
+    }
+    return sb.toString().trim();
+  }
+
+  /** Return a random string of exactly {@code length}. */
+  private static String nextExactString(Random random, int length) {
+    StringBuilder sb = new StringBuilder();
+    while (length-- > 0) {
+      sb.append((char) ('a' + random.nextInt(26)));
+    }
+    return sb.toString();
+  }
+
+  /** Return a random email address. */
+  private static String nextEmail(Random random) {
+    return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+  }
+
+  /** Return a random credit card number. */
+  private static String nextCreditCard(Random random) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 4; i++) {
+      if (i > 0) {
+        sb.append(' ');
+      }
+      sb.append(String.format("%04d", random.nextInt(10000)));
+    }
+    return sb.toString();
+  }
+
+  /** Return a random price. */
+  private static long nextPrice(Random random) {
+    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+  }
+
+  /** Return a random time delay, in milliseconds, for length of auctions. */
+  private long nextAuctionLengthMs(Random random, long timestamp) {
+    // What's our current event number?
+    long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
+    // How many events till we've generated numInFlightAuctions?
+    long numEventsForAuctions =
+        (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
+        / GeneratorConfig.AUCTION_PROPORTION;
+    // When will the auction numInFlightAuctions beyond now be generated?
+    long futureAuction =
+        config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+            .getKey();
+    // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+    //     futureAuction - timestamp, numEventsForAuctions);
+    // Choose a length with average horizonMs.
+    long horizonMs = futureAuction - timestamp;
+    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+  }
+
+  /**
+   * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+   * {@code averageSize}.
+   */
+  private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+    if (currentSize > desiredAverageSize) {
+      return "";
+    }
+    desiredAverageSize -= currentSize;
+    int delta = (int) Math.round(desiredAverageSize * 0.2);
+    int minSize = desiredAverageSize - delta;
+    int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+    return nextExactString(random, desiredSize);
+  }
+
+  /** Return a random long from {@code [0, n)}. */
+  private static long nextLong(Random random, long n) {
+    if (n < Integer.MAX_VALUE) {
+      return random.nextInt((int) n);
+    } else {
+      // TODO: Very skewed distribution! Bad!
+      return Math.abs(random.nextLong()) % n;
+    }
+  }
+
+  /**
+   * Generate and return a random person with next available id.
+   */
+  private Person nextPerson(Random random, long timestamp) {
+    long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
+    String name = nextPersonName(random);
+    String email = nextEmail(random);
+    String creditCard = nextCreditCard(random);
+    String city = nextUSCity(random);
+    String state = nextUSState(random);
+    int currentSize =
+        8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+    String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
+    return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+  }
+
+  /**
+   * Return a random person id (base 0).
+   */
+  private long nextBase0PersonId(Random random) {
+    // Choose a random person from any of the 'active' people, plus a few 'leads'.
+    // By limiting to 'active' we ensure the density of bids or auctions per person
+    // does not decrease over time for long running jobs.
+    // By choosing a person id ahead of the last valid person id we will make
+    // newPerson and newAuction events appear to have been swapped in time.
+    long numPeople = lastBase0PersonId() + 1;
+    long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
+    long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+    return numPeople - activePeople + n;
+  }
+
+  /**
+   * Return a random auction id (base 0).
+   */
+  private long nextBase0AuctionId(Random random) {
+    // Choose a random auction for any of those which are likely to still be in flight,
+    // plus a few 'leads'.
+    // Note that ideally we'd track non-expired auctions exactly, but that state
+    // is difficult to split.
+    long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
+    long maxAuction = lastBase0AuctionId();
+    return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+  }
+
+  /**
+   * Generate and return a random auction with next available id.
+   */
+  private Auction nextAuction(Random random, long timestamp) {
+    long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
+
+    long seller;
+    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+    if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
+      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+      seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+    } else {
+      seller = nextBase0PersonId(random);
+    }
+    seller += GeneratorConfig.FIRST_PERSON_ID;
+
+    long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+    long initialBid = nextPrice(random);
+    long dateTime = timestamp;
+    long expires = timestamp + nextAuctionLengthMs(random, timestamp);
+    String name = nextString(random, 20);
+    String desc = nextString(random, 100);
+    long reserve = initialBid + nextPrice(random);
+    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+    String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
+    return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category,
+        extra);
+  }
+
+  /**
+   * Generate and return a random bid with next available id.
+   */
+  private Bid nextBid(Random random, long timestamp) {
+    long auction;
+    // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
+    if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
+      // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
+      auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+    } else {
+      auction = nextBase0AuctionId(random);
+    }
+    auction += GeneratorConfig.FIRST_AUCTION_ID;
+
+    long bidder;
+    // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
+    if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
+      // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
+      // last HOT_BIDDER_RATIO people.
+      bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+    } else {
+      bidder = nextBase0PersonId(random);
+    }
+    bidder += GeneratorConfig.FIRST_PERSON_ID;
+
+    long price = nextPrice(random);
+    int currentSize = 8 + 8 + 8 + 8;
+    String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
+    return new Bid(auction, bidder, price, timestamp, extra);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return numEvents < config.maxEvents;
+  }
+
+  /**
+   * Return the next event. The outer timestamp is in wallclock time and corresponds to
+   * when the event should fire. The inner timestamp is in event-time and represents the
+   * time the event is purported to have taken place in the simulation.
+   */
+  public NextEvent nextEvent() {
+    if (wallclockBaseTime < 0) {
+      wallclockBaseTime = System.currentTimeMillis();
+    }
+    // When, in event time, we should generate the event. Monotonic.
+    long eventTimestamp =
+        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
+    // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
+    // may have local jitter.
+    long adjustedEventTimestamp =
+        config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
+            .getKey();
+    // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
+    // the event timestamp.
+    long watermark =
+        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
+            .getKey();
+    // When, in wallclock time, we should emit the event.
+    long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
+
+    // Seed the random number generator with the next 'event id'.
+    Random random = new Random(getNextEventId());
+    long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+    Event event;
+    if (rem < GeneratorConfig.PERSON_PROPORTION) {
+      event = new Event(nextPerson(random, adjustedEventTimestamp));
+    } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+      event = new Event(nextAuction(random, adjustedEventTimestamp));
+    } else {
+      event = new Event(nextBid(random, adjustedEventTimestamp));
+    }
+
+    numEvents++;
+    return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
+  }
+
+  @Override
+  public TimestampedValue<Event> next() {
+    NextEvent next = nextEvent();
+    return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Return how many microseconds till we emit the next event.
+   */
+  public long currentInterEventDelayUs() {
+    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
+        .getValue();
+  }
+
+  /**
+   * Return an estimate of fraction of output consumed.
+   */
+  public double getFractionConsumed() {
+    return (double) numEvents / config.maxEvents;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
+        numEvents, wallclockBaseTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
new file mode 100644
index 0000000..59aaf49
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.KV;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
+ */
+class GeneratorConfig implements Serializable {
+  /**
+   * We start the ids at specific values to help ensure the queries find a match even on
+   * small synthesized dataset sizes.
+   */
+  public static final long FIRST_AUCTION_ID = 1000L;
+  public static final long FIRST_PERSON_ID = 1000L;
+  public static final long FIRST_CATEGORY_ID = 10L;
+
+  /**
+   * Proportions of people/auctions/bids to synthesize.
+   */
+  public static final int PERSON_PROPORTION = 1;
+  public static final int AUCTION_PROPORTION = 3;
+  public static final int BID_PROPORTION = 46;
+  public static final int PROPORTION_DENOMINATOR =
+      PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
+
+  /**
+   * Environment options.
+   */
+  public final NexmarkConfiguration configuration;
+
+  /**
+   * Delay between events, in microseconds. If the array has more than one entry then
+   * the rate is changed every {@link #stepLengthSec}, and wraps around.
+   */
+  public final long[] interEventDelayUs;
+
+  /**
+   * Delay before changing the current inter-event delay.
+   */
+  public final long stepLengthSec;
+
+  /**
+   * Time for first event (ms since epoch).
+   */
+  public final long baseTime;
+
+  /**
+   * Event id of first event to be generated. Event ids are unique over all generators, and
+   * are used as a seed to generate each event's data.
+   */
+  public final long firstEventId;
+
+  /**
+   * Maximum number of events to generate.
+   */
+  public final long maxEvents;
+
+  /**
+   * First event number. Generators running in parallel time may share the same event number,
+   * and the event number is used to determine the event timestamp.
+   */
+  public final long firstEventNumber;
+
+  /**
+   * True period of epoch in milliseconds. Derived from above.
+   * (Ie time to run through cycle for all interEventDelayUs entries).
+   */
+  public final long epochPeriodMs;
+
+  /**
+   * Number of events per epoch. Derived from above.
+   * (Ie number of events to run through cycle for all interEventDelayUs entries).
+   */
+  public final long eventsPerEpoch;
+
+  public GeneratorConfig(
+      NexmarkConfiguration configuration, long baseTime, long firstEventId,
+      long maxEventsOrZero, long firstEventNumber) {
+    this.configuration = configuration;
+    this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
+        configuration.firstEventRate, configuration.nextEventRate,
+        configuration.rateUnit, configuration.numEventGenerators);
+    this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
+    this.baseTime = baseTime;
+    this.firstEventId = firstEventId;
+    if (maxEventsOrZero == 0) {
+      // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
+      this.maxEvents =
+          Long.MAX_VALUE / (PROPORTION_DENOMINATOR
+                            * Math.max(
+              Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
+              configuration.avgBidByteSize));
+    } else {
+      this.maxEvents = maxEventsOrZero;
+    }
+    this.firstEventNumber = firstEventNumber;
+
+    long eventsPerEpoch = 0;
+    long epochPeriodMs = 0;
+    if (interEventDelayUs.length > 1) {
+      for (int i = 0; i < interEventDelayUs.length; i++) {
+        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+        eventsPerEpoch += numEventsForThisCycle;
+        epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+      }
+    }
+    this.eventsPerEpoch = eventsPerEpoch;
+    this.epochPeriodMs = epochPeriodMs;
+  }
+
+  /**
+   * Return a clone of this config.
+   */
+  @Override
+  public GeneratorConfig clone() {
+    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+  }
+
+  /**
+   * Return clone of this config except with given parameters.
+   */
+  public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) {
+    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+  }
+
+  /**
+   * Split this config into {@code n} sub-configs with roughly equal number of
+   * possible events, but distinct value spaces. The generators will run on parallel timelines.
+   * This config should no longer be used.
+   */
+  public List<GeneratorConfig> split(int n) {
+    List<GeneratorConfig> results = new ArrayList<>();
+    if (n == 1) {
+      // No split required.
+      results.add(this);
+    } else {
+      long subMaxEvents = maxEvents / n;
+      long subFirstEventId = firstEventId;
+      for (int i = 0; i < n; i++) {
+        if (i == n - 1) {
+          // Don't loose any events to round-down.
+          subMaxEvents = maxEvents - subMaxEvents * (n - 1);
+        }
+        results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber));
+        subFirstEventId += subMaxEvents;
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Return an estimate of the bytes needed by {@code numEvents}.
+   */
+  public long estimatedBytesForEvents(long numEvents) {
+    long numPersons =
+        (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
+    long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
+    return numPersons * configuration.avgPersonByteSize
+           + numAuctions * configuration.avgAuctionByteSize
+           + numBids * configuration.avgBidByteSize;
+  }
+
+  /**
+   * Return an estimate of the byte-size of all events a generator for this config would yield.
+   */
+  public long getEstimatedSizeBytes() {
+    return estimatedBytesForEvents(maxEvents);
+  }
+
+  /**
+   * Return the first 'event id' which could be generated from this config. Though events don't
+   * have ids we can simulate them to help bookkeeping.
+   */
+  public long getStartEventId() {
+    return firstEventId + firstEventNumber;
+  }
+
+  /**
+   * Return one past the last 'event id' which could be generated from this config.
+   */
+  public long getStopEventId() {
+    return firstEventId + firstEventNumber + maxEvents;
+  }
+
+  /**
+   * Return the next event number for a generator which has so far emitted {@code numEvents}.
+   */
+  public long nextEventNumber(long numEvents) {
+    return firstEventNumber + numEvents;
+  }
+
+  /**
+   * Return the next event number for a generator which has so far emitted {@code numEvents},
+   * but adjusted to account for {@code outOfOrderGroupSize}.
+   */
+  public long nextAdjustedEventNumber(long numEvents) {
+    long n = configuration.outOfOrderGroupSize;
+    long eventNumber = nextEventNumber(numEvents);
+    long base = (eventNumber / n) * n;
+    long offset = (eventNumber * 953) % n;
+    return base + offset;
+  }
+
+  /**
+   * Return the event number who's event time will be a suitable watermark for
+   * a generator which has so far emitted {@code numEvents}.
+   */
+  public long nextEventNumberForWatermark(long numEvents) {
+    long n = configuration.outOfOrderGroupSize;
+    long eventNumber = nextEventNumber(numEvents);
+    return (eventNumber / n) * n;
+  }
+
+  /**
+   * What timestamp should the event with {@code eventNumber} have for this generator? And
+   * what inter-event delay (in microseconds) is current?
+   */
+  public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
+    if (interEventDelayUs.length == 1) {
+      long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
+      return KV.of(timestamp, interEventDelayUs[0]);
+    }
+
+    long epoch = eventNumber / eventsPerEpoch;
+    long n = eventNumber % eventsPerEpoch;
+    long offsetInEpochMs = 0;
+    for (int i = 0; i < interEventDelayUs.length; i++) {
+      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+      if (n < numEventsForThisCycle) {
+        long offsetInCycleUs = n * interEventDelayUs[i];
+        long timestamp =
+            baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
+        return KV.of(timestamp, interEventDelayUs[i]);
+      }
+      n -= numEventsForThisCycle;
+      offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+    }
+    throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("GeneratorConfig");
+    sb.append("{configuration:");
+    sb.append(configuration.toString());
+    sb.append(";interEventDelayUs=[");
+    for (int i = 0; i < interEventDelayUs.length; i++) {
+      if (i > 0) {
+        sb.append(",");
+      }
+      sb.append(interEventDelayUs[i]);
+    }
+    sb.append("]");
+    sb.append(";stepLengthSec:");
+    sb.append(stepLengthSec);
+    sb.append(";baseTime:");
+    sb.append(baseTime);
+    sb.append(";firstEventId:");
+    sb.append(firstEventId);
+    sb.append(";maxEvents:");
+    sb.append(maxEvents);
+    sb.append(";firstEventNumber:");
+    sb.append(firstEventNumber);
+    sb.append(";epochPeriodMs:");
+    sb.append(epochPeriodMs);
+    sb.append(";eventsPerEpoch:");
+    sb.append(eventsPerEpoch);
+    sb.append("}");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
new file mode 100644
index 0000000..c72b76a
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result type of {@link Query8}.
+ */
+public class IdNameReserve implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
+    @Override
+    public void encode(IdNameReserve value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.name, outStream, Context.NESTED);
+      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+    }
+
+    @Override
+    public IdNameReserve decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String name = STRING_CODER.decode(inStream, Context.NESTED);
+      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+      return new IdNameReserve(id, name, reserve);
+    }
+  };
+
+  @JsonProperty
+  public final long id;
+
+  @JsonProperty
+  public final String name;
+
+  /** Reserve price in cents. */
+  @JsonProperty
+  public final long reserve;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private IdNameReserve() {
+    id = 0;
+    name = null;
+    reserve = 0;
+  }
+
+  public IdNameReserve(long id, String name, long reserve) {
+    this.id = id;
+    this.name = name;
+    this.reserve = reserve;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + name.length() + 1 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
new file mode 100644
index 0000000..394b6db
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+/**
+ * Interface for elements which can quickly estimate their encoded byte size.
+ */
+public interface KnownSize {
+  long sizeInBytes();
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
new file mode 100644
index 0000000..6874578
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Max.MaxLongFn;
+import org.apache.beam.sdk.transforms.Min.MinLongFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.io.Serializable;
+
+/**
+ * A monitor of elements with support for later retrieving their aggregators.
+ *
+ * @param <T> Type of element we are monitoring.
+ */
+public class Monitor<T extends KnownSize> implements Serializable {
+  private class MonitorDoFn extends DoFn<T, T> {
+    public final Aggregator<Long, Long> elementCounter =
+        createAggregator(counterNamePrefix + "_elements", new SumLongFn());
+    public final Aggregator<Long, Long> bytesCounter =
+        createAggregator(counterNamePrefix + "_bytes", new SumLongFn());
+    public final Aggregator<Long, Long> startTime =
+        createAggregator(counterNamePrefix + "_startTime", new MinLongFn());
+    public final Aggregator<Long, Long> endTime =
+        createAggregator(counterNamePrefix + "_endTime", new MaxLongFn());
+    public final Aggregator<Long, Long> startTimestamp =
+        createAggregator("startTimestamp", new MinLongFn());
+    public final Aggregator<Long, Long> endTimestamp =
+        createAggregator("endTimestamp", new MaxLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      elementCounter.addValue(1L);
+      bytesCounter.addValue(c.element().sizeInBytes());
+      long now = System.currentTimeMillis();
+      startTime.addValue(now);
+      endTime.addValue(now);
+      startTimestamp.addValue(c.timestamp().getMillis());
+      endTimestamp.addValue(c.timestamp().getMillis());
+      c.output(c.element());
+    }
+  }
+
+  final MonitorDoFn doFn;
+  final PTransform<PCollection<? extends T>, PCollection<T>> transform;
+  private String counterNamePrefix;
+
+  public Monitor(String name, String counterNamePrefix) {
+    this.counterNamePrefix = counterNamePrefix;
+    doFn = new MonitorDoFn();
+    transform = ParDo.named(name + ".Monitor").of(doFn);
+  }
+
+  public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
+    return transform;
+  }
+
+  public Aggregator<Long, Long> getElementCounter() {
+    return doFn.elementCounter;
+  }
+
+  public Aggregator<Long, Long> getBytesCounter() {
+    return doFn.bytesCounter;
+  }
+
+  public Aggregator<Long, Long> getStartTime() {
+    return doFn.startTime;
+  }
+
+  public Aggregator<Long, Long> getEndTime() {
+    return doFn.endTime;
+  }
+
+  public Aggregator<Long, Long> getStartTimestamp() {
+    return doFn.startTimestamp;
+  }
+
+  public Aggregator<Long, Long> getEndTimestamp() {
+    return doFn.endTimestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
new file mode 100644
index 0000000..2753d2e
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query3}.
+ */
+public class NameCityStateId implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
+    @Override
+    public void encode(NameCityStateId value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      STRING_CODER.encode(value.name, outStream, Context.NESTED);
+      STRING_CODER.encode(value.city, outStream, Context.NESTED);
+      STRING_CODER.encode(value.state, outStream, Context.NESTED);
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+    }
+
+    @Override
+    public NameCityStateId decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      String name = STRING_CODER.decode(inStream, Context.NESTED);
+      String city = STRING_CODER.decode(inStream, Context.NESTED);
+      String state = STRING_CODER.decode(inStream, Context.NESTED);
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      return new NameCityStateId(name, city, state, id);
+    }
+  };
+
+  @JsonProperty
+  public final String name;
+
+  @JsonProperty
+  public final String city;
+
+  @JsonProperty
+  public final String state;
+
+  @JsonProperty
+  public final long id;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private NameCityStateId() {
+    name = null;
+    city = null;
+    state = null;
+    id = 0;
+  }
+
+  public NameCityStateId(String name, String city, String state, long id) {
+    this.name = name;
+    this.city = city;
+    this.state = state;
+    this.id = id;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
new file mode 100644
index 0000000..2292ba5
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+
+/**
+ * Configuration controlling how a query is run. May be supplied by command line or
+ * programmatically. We only capture properties which may influence the resulting
+ * pipeline performance, as captured by {@link NexmarkPerf}.
+ */
+class NexmarkConfiguration implements Serializable {
+  public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
+
+  /** If {@literal true}, include additional debugging and monitoring stats. */
+  @JsonProperty
+  public boolean debug = true;
+
+  /** Which query to run, in [0,9]. */
+  @JsonProperty
+  public int query = 0;
+
+  /** Where events come from. */
+  @JsonProperty
+  public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT;
+
+  /** Where results go to. */
+  @JsonProperty
+  public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
+
+  /**
+   * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated
+   * into the overall query pipeline.
+   */
+  @JsonProperty
+  public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED;
+
+  /**
+   * Number of events to generate. If zero, generate as many as possible without overflowing
+   * internal counters etc.
+   */
+  @JsonProperty
+  public long numEvents = 100000;
+
+  /**
+   * Number of event generators to use. Each generates events in its own timeline.
+   */
+  @JsonProperty
+  public int numEventGenerators = 100;
+
+  /**
+   * Shape of event rate curve.
+   */
+  @JsonProperty
+  public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE;
+
+  /**
+   * Initial overall event rate (in {@link #rateUnit}).
+   */
+  @JsonProperty
+  public int firstEventRate = 10000;
+
+  /**
+   * Next overall event rate (in {@link #rateUnit}).
+   */
+  @JsonProperty
+  public int nextEventRate = 10000;
+
+  /**
+   * Unit for rates.
+   */
+  @JsonProperty
+  public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND;
+
+  /**
+   * Overall period of rate shape, in seconds.
+   */
+  @JsonProperty
+  public int ratePeriodSec = 600;
+
+  /**
+   * Time in seconds to preload the subscription with data, at the initial input rate of the
+   * pipeline.
+   */
+  @JsonProperty
+  public int preloadSeconds = 0;
+
+  /**
+   * If true, and in streaming mode, generate events only when they are due according to their
+   * timestamp.
+   */
+  @JsonProperty
+  public boolean isRateLimited = false;
+
+  /**
+   * If true, use wallclock time as event time. Otherwise, use a deterministic
+   * time in the past so that multiple runs will see exactly the same event streams
+   * and should thus have exactly the same results.
+   */
+  @JsonProperty
+  public boolean useWallclockEventTime = false;
+
+  /** Average idealized size of a 'new person' event, in bytes. */
+  @JsonProperty
+  public int avgPersonByteSize = 200;
+
+  /** Average idealized size of a 'new auction' event, in bytes. */
+  @JsonProperty
+  public int avgAuctionByteSize = 500;
+
+  /** Average idealized size of a 'bid' event, in bytes. */
+  @JsonProperty
+  public int avgBidByteSize = 100;
+
+  /** Ratio of bids to 'hot' auctions compared to all other auctions. */
+  @JsonProperty
+  public int hotAuctionRatio = 1;
+
+  /** Ratio of auctions for 'hot' sellers compared to all other people. */
+  @JsonProperty
+  public int hotSellersRatio = 1;
+
+  /** Ratio of bids for 'hot' bidders compared to all other people. */
+  @JsonProperty
+  public int hotBiddersRatio = 1;
+
+  /** Window size, in seconds, for queries 3, 5, 7 and 8. */
+  @JsonProperty
+  public long windowSizeSec = 10;
+
+  /** Sliding window period, in seconds, for query 5. */
+  @JsonProperty
+  public long windowPeriodSec = 5;
+
+  /** Number of seconds to hold back events according to their reported timestamp. */
+  @JsonProperty
+  public long watermarkHoldbackSec = 0;
+
+  /** Average number of auction which should be inflight at any time, per generator. */
+  @JsonProperty
+  public int numInFlightAuctions = 100;
+
+  /** Maximum number of people to consider as active for placing auctions or bids. */
+  @JsonProperty
+  public int numActivePeople = 1000;
+
+  /** Coder strategy to follow. */
+  @JsonProperty
+  public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND;
+
+  /**
+   * Delay, in milliseconds, for each event. This will peg one core for this number
+   * of milliseconds to simulate CPU-bound computation.
+   */
+  @JsonProperty
+  public long cpuDelayMs = 0;
+
+  /**
+   * Extra data, in bytes, to save to persistent state for each event. This will force
+   * i/o all the way to durable storage to simulate an I/O-bound computation.
+   */
+  @JsonProperty
+  public long diskBusyBytes = 0;
+
+  /**
+   * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction.
+   */
+  @JsonProperty
+  public int auctionSkip = 123;
+
+  /**
+   * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum).
+   */
+  @JsonProperty
+  public int fanout = 5;
+
+  /**
+   * Length of occasional delay to impose on events (in seconds).
+   */
+  @JsonProperty
+  public long occasionalDelaySec = 0;
+
+  /**
+   * Probability that an event will be delayed by delayS.
+   */
+  @JsonProperty
+  public double probDelayedEvent = 0.0;
+
+  /**
+   * Maximum size of each log file (in events). For Query10 only.
+   */
+  @JsonProperty
+  public int maxLogEvents = 100_000;
+
+  /**
+   * If true, use pub/sub publish time instead of event time.
+   */
+  @JsonProperty
+  public boolean usePubsubPublishTime = false;
+
+  /**
+   * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies
+   * every 1000 events per generator are emitted in pseudo-random order.
+   */
+  @JsonProperty
+  public long outOfOrderGroupSize = 1;
+
+  /**
+   * Replace any properties of this configuration which have been supplied by the command line.
+   */
+  public void overrideFromOptions(Options options) {
+    if (options.getDebug() != null) {
+      debug = options.getDebug();
+    }
+    if (options.getQuery() != null) {
+      query = options.getQuery();
+    }
+    if (options.getSourceType() != null) {
+      sourceType = options.getSourceType();
+    }
+    if (options.getSinkType() != null) {
+      sinkType = options.getSinkType();
+    }
+    if (options.getPubSubMode() != null) {
+      pubSubMode = options.getPubSubMode();
+    }
+    if (options.getNumEvents() != null) {
+      numEvents = options.getNumEvents();
+    }
+    if (options.getNumEventGenerators() != null) {
+      numEventGenerators = options.getNumEventGenerators();
+    }
+    if (options.getRateShape() != null) {
+      rateShape = options.getRateShape();
+    }
+    if (options.getFirstEventRate() != null) {
+      firstEventRate = options.getFirstEventRate();
+    }
+    if (options.getNextEventRate() != null) {
+      nextEventRate = options.getNextEventRate();
+    }
+    if (options.getRateUnit() != null) {
+      rateUnit = options.getRateUnit();
+    }
+    if (options.getRatePeriodSec() != null) {
+      ratePeriodSec = options.getRatePeriodSec();
+    }
+    if (options.getPreloadSeconds() != null) {
+      preloadSeconds = options.getPreloadSeconds();
+    }
+    if (options.getIsRateLimited() != null) {
+      isRateLimited = options.getIsRateLimited();
+    }
+    if (options.getUseWallclockEventTime() != null) {
+      useWallclockEventTime = options.getUseWallclockEventTime();
+    }
+    if (options.getAvgPersonByteSize() != null) {
+      avgPersonByteSize = options.getAvgPersonByteSize();
+    }
+    if (options.getAvgAuctionByteSize() != null) {
+      avgAuctionByteSize = options.getAvgAuctionByteSize();
+    }
+    if (options.getAvgBidByteSize() != null) {
+      avgBidByteSize = options.getAvgBidByteSize();
+    }
+    if (options.getHotAuctionRatio() != null) {
+      hotAuctionRatio = options.getHotAuctionRatio();
+    }
+    if (options.getHotSellersRatio() != null) {
+      hotSellersRatio = options.getHotSellersRatio();
+    }
+    if (options.getHotBiddersRatio() != null) {
+      hotBiddersRatio = options.getHotBiddersRatio();
+    }
+    if (options.getWindowSizeSec() != null) {
+      windowSizeSec = options.getWindowSizeSec();
+    }
+    if (options.getWindowPeriodSec() != null) {
+      windowPeriodSec = options.getWindowPeriodSec();
+    }
+    if (options.getWatermarkHoldbackSec() != null) {
+      watermarkHoldbackSec = options.getWatermarkHoldbackSec();
+    }
+    if (options.getNumInFlightAuctions() != null) {
+      numInFlightAuctions = options.getNumInFlightAuctions();
+    }
+    if (options.getNumActivePeople() != null) {
+      numActivePeople = options.getNumActivePeople();
+    }
+    if (options.getCoderStrategy() != null) {
+      coderStrategy = options.getCoderStrategy();
+    }
+    if (options.getCpuDelayMs() != null) {
+      cpuDelayMs = options.getCpuDelayMs();
+    }
+    if (options.getDiskBusyBytes() != null) {
+      diskBusyBytes = options.getDiskBusyBytes();
+    }
+    if (options.getAuctionSkip() != null) {
+      auctionSkip = options.getAuctionSkip();
+    }
+    if (options.getFanout() != null) {
+      fanout = options.getFanout();
+    }
+    if (options.getOccasionalDelaySec() != null) {
+      occasionalDelaySec = options.getOccasionalDelaySec();
+    }
+    if (options.getProbDelayedEvent() != null) {
+      probDelayedEvent = options.getProbDelayedEvent();
+    }
+    if (options.getMaxLogEvents() != null) {
+      maxLogEvents = options.getMaxLogEvents();
+    }
+    if (options.getUsePubsubPublishTime() != null) {
+      usePubsubPublishTime = options.getUsePubsubPublishTime();
+    }
+    if (options.getOutOfOrderGroupSize() != null) {
+      outOfOrderGroupSize = options.getOutOfOrderGroupSize();
+    }
+  }
+
+  /**
+   * Return clone of configuration with given label.
+   */
+  @Override
+  public NexmarkConfiguration clone() {
+    NexmarkConfiguration result = new NexmarkConfiguration();
+    result.debug = debug;
+    result.query = query;
+    result.sourceType = sourceType;
+    result.sinkType = sinkType;
+    result.pubSubMode = pubSubMode;
+    result.numEvents = numEvents;
+    result.numEventGenerators = numEventGenerators;
+    result.rateShape = rateShape;
+    result.firstEventRate = firstEventRate;
+    result.nextEventRate = nextEventRate;
+    result.rateUnit = rateUnit;
+    result.ratePeriodSec = ratePeriodSec;
+    result.preloadSeconds = preloadSeconds;
+    result.isRateLimited = isRateLimited;
+    result.useWallclockEventTime = useWallclockEventTime;
+    result.avgPersonByteSize = avgPersonByteSize;
+    result.avgAuctionByteSize = avgAuctionByteSize;
+    result.avgBidByteSize = avgBidByteSize;
+    result.hotAuctionRatio = hotAuctionRatio;
+    result.hotSellersRatio = hotSellersRatio;
+    result.hotBiddersRatio = hotBiddersRatio;
+    result.windowSizeSec = windowSizeSec;
+    result.windowPeriodSec = windowPeriodSec;
+    result.watermarkHoldbackSec = watermarkHoldbackSec;
+    result.numInFlightAuctions = numInFlightAuctions;
+    result.numActivePeople = numActivePeople;
+    result.coderStrategy = coderStrategy;
+    result.cpuDelayMs = cpuDelayMs;
+    result.diskBusyBytes = diskBusyBytes;
+    result.auctionSkip = auctionSkip;
+    result.fanout = fanout;
+    result.occasionalDelaySec = occasionalDelaySec;
+    result.probDelayedEvent = probDelayedEvent;
+    result.maxLogEvents = maxLogEvents;
+    result.usePubsubPublishTime = usePubsubPublishTime;
+    result.outOfOrderGroupSize = outOfOrderGroupSize;
+    return result;
+  }
+
+  /**
+   * Return short description of configuration (suitable for use in logging). We only render
+   * the core fields plus those which do not have default values.
+   */
+  public String toShortString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(String.format("query:%d", query));
+    if (debug != DEFAULT.debug) {
+      sb.append(String.format("; debug:%s", debug));
+    }
+    if (sourceType != DEFAULT.sourceType) {
+      sb.append(String.format("; sourceType:%s", sourceType));
+    }
+    if (sinkType != DEFAULT.sinkType) {
+      sb.append(String.format("; sinkType:%s", sinkType));
+    }
+    if (pubSubMode != DEFAULT.pubSubMode) {
+      sb.append(String.format("; pubSubMode:%s", pubSubMode));
+    }
+    if (numEvents != DEFAULT.numEvents) {
+      sb.append(String.format("; numEvents:%d", numEvents));
+    }
+    if (numEventGenerators != DEFAULT.numEventGenerators) {
+      sb.append(String.format("; numEventGenerators:%d", numEventGenerators));
+    }
+    if (rateShape != DEFAULT.rateShape) {
+      sb.append(String.format("; rateShape:%s", rateShape));
+    }
+    if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) {
+      sb.append(String.format("; firstEventRate:%d", firstEventRate));
+      sb.append(String.format("; nextEventRate:%d", nextEventRate));
+    }
+    if (rateUnit != DEFAULT.rateUnit) {
+      sb.append(String.format("; rateUnit:%s", rateUnit));
+    }
+    if (ratePeriodSec != DEFAULT.ratePeriodSec) {
+      sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec));
+    }
+    if (preloadSeconds != DEFAULT.preloadSeconds) {
+      sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
+    }
+    if (isRateLimited != DEFAULT.isRateLimited) {
+      sb.append(String.format("; isRateLimited:%s", isRateLimited));
+    }
+    if (useWallclockEventTime != DEFAULT.useWallclockEventTime) {
+      sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime));
+    }
+    if (avgPersonByteSize != DEFAULT.avgPersonByteSize) {
+      sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize));
+    }
+    if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) {
+      sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize));
+    }
+    if (avgBidByteSize != DEFAULT.avgBidByteSize) {
+      sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize));
+    }
+    if (hotAuctionRatio != DEFAULT.hotAuctionRatio) {
+      sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio));
+    }
+    if (hotSellersRatio != DEFAULT.hotSellersRatio) {
+      sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio));
+    }
+    if (hotBiddersRatio != DEFAULT.hotBiddersRatio) {
+      sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio));
+    }
+    if (windowSizeSec != DEFAULT.windowSizeSec) {
+      sb.append(String.format("; windowSizeSec:%d", windowSizeSec));
+    }
+    if (windowPeriodSec != DEFAULT.windowPeriodSec) {
+      sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec));
+    }
+    if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) {
+      sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec));
+    }
+    if (numInFlightAuctions != DEFAULT.numInFlightAuctions) {
+      sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions));
+    }
+    if (numActivePeople != DEFAULT.numActivePeople) {
+      sb.append(String.format("; numActivePeople:%d", numActivePeople));
+    }
+    if (coderStrategy != DEFAULT.coderStrategy) {
+      sb.append(String.format("; coderStrategy:%s", coderStrategy));
+    }
+    if (cpuDelayMs != DEFAULT.cpuDelayMs) {
+      sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs));
+    }
+    if (diskBusyBytes != DEFAULT.diskBusyBytes) {
+      sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes));
+    }
+    if (auctionSkip != DEFAULT.auctionSkip) {
+      sb.append(String.format("; auctionSkip:%d", auctionSkip));
+    }
+    if (fanout != DEFAULT.fanout) {
+      sb.append(String.format("; fanout:%d", fanout));
+    }
+    if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
+      sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
+    }
+    if (probDelayedEvent != DEFAULT.probDelayedEvent) {
+      sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent));
+    }
+    if (maxLogEvents != DEFAULT.maxLogEvents) {
+      sb.append(String.format("; maxLogEvents:%d", maxLogEvents));
+    }
+    if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) {
+      sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime));
+    }
+    if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) {
+      sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Return full description as a string.
+   */
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parse an object from {@code string}.
+   *
+   * @throws IOException
+   */
+  public static NexmarkConfiguration fromString(String string) {
+    try {
+      return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to parse nexmark configuration: ", e);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(debug, query, sourceType, sinkType, pubSubMode,
+        numEvents, numEventGenerators, rateShape, firstEventRate, nextEventRate, rateUnit,
+        ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
+        avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
+        windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
+        coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout,
+        occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
+        outOfOrderGroupSize);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    NexmarkConfiguration other = (NexmarkConfiguration) obj;
+    if (debug != other.debug) {
+      return false;
+    }
+    if (auctionSkip != other.auctionSkip) {
+      return false;
+    }
+    if (avgAuctionByteSize != other.avgAuctionByteSize) {
+      return false;
+    }
+    if (avgBidByteSize != other.avgBidByteSize) {
+      return false;
+    }
+    if (avgPersonByteSize != other.avgPersonByteSize) {
+      return false;
+    }
+    if (coderStrategy != other.coderStrategy) {
+      return false;
+    }
+    if (cpuDelayMs != other.cpuDelayMs) {
+      return false;
+    }
+    if (diskBusyBytes != other.diskBusyBytes) {
+      return false;
+    }
+    if (fanout != other.fanout) {
+      return false;
+    }
+    if (firstEventRate != other.firstEventRate) {
+      return false;
+    }
+    if (hotAuctionRatio != other.hotAuctionRatio) {
+      return false;
+    }
+    if (hotBiddersRatio != other.hotBiddersRatio) {
+      return false;
+    }
+    if (hotSellersRatio != other.hotSellersRatio) {
+      return false;
+    }
+    if (isRateLimited != other.isRateLimited) {
+      return false;
+    }
+    if (maxLogEvents != other.maxLogEvents) {
+      return false;
+    }
+    if (nextEventRate != other.nextEventRate) {
+      return false;
+    }
+    if (rateUnit != other.rateUnit) {
+      return false;
+    }
+    if (numEventGenerators != other.numEventGenerators) {
+      return false;
+    }
+    if (numEvents != other.numEvents) {
+      return false;
+    }
+    if (numInFlightAuctions != other.numInFlightAuctions) {
+      return false;
+    }
+    if (numActivePeople != other.numActivePeople) {
+      return false;
+    }
+    if (occasionalDelaySec != other.occasionalDelaySec) {
+      return false;
+    }
+    if (preloadSeconds != other.preloadSeconds) {
+      return false;
+    }
+    if (Double.doubleToLongBits(probDelayedEvent)
+        != Double.doubleToLongBits(other.probDelayedEvent)) {
+      return false;
+    }
+    if (pubSubMode != other.pubSubMode) {
+      return false;
+    }
+    if (ratePeriodSec != other.ratePeriodSec) {
+      return false;
+    }
+    if (rateShape != other.rateShape) {
+      return false;
+    }
+    if (query != other.query) {
+      return false;
+    }
+    if (sinkType != other.sinkType) {
+      return false;
+    }
+    if (sourceType != other.sourceType) {
+      return false;
+    }
+    if (useWallclockEventTime != other.useWallclockEventTime) {
+      return false;
+    }
+    if (watermarkHoldbackSec != other.watermarkHoldbackSec) {
+      return false;
+    }
+    if (windowPeriodSec != other.windowPeriodSec) {
+      return false;
+    }
+    if (windowSizeSec != other.windowSizeSec) {
+      return false;
+    }
+    if (usePubsubPublishTime != other.usePubsubPublishTime) {
+      return false;
+    }
+    if (outOfOrderGroupSize != other.outOfOrderGroupSize) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
new file mode 100644
index 0000000..dbc1ce2
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are 11 queries over a three table schema representing on online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of streaming dataflow.
+ * <p>
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ * <p>
+ * <p>See
+ * <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+public class NexmarkDriver<OptionT extends Options> {
+
+  /**
+   * Entry point.
+   */
+  public void runAll(OptionT options, NexmarkRunner runner) {
+    Instant start = Instant.now();
+    Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
+    Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
+    Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
+
+    boolean successful = true;
+    try {
+      // Run all the configurations.
+      for (NexmarkConfiguration configuration : configurations) {
+        NexmarkPerf perf = runner.run(configuration);
+        if (perf != null) {
+          if (perf.errors == null || perf.errors.size() > 0) {
+            successful = false;
+          }
+          appendPerf(options.getPerfFilename(), configuration, perf);
+          actual.put(configuration, perf);
+          // Summarize what we've run so far.
+          saveSummary(null, configurations, actual, baseline, start);
+        }
+      }
+    } finally {
+      if (options.getMonitorJobs()) {
+        // Report overall performance.
+        saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
+        saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
+      }
+    }
+
+    if (!successful) {
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Append the pair of {@code configuration} and {@code perf} to perf file.
+   */
+  private void appendPerf(
+      @Nullable String perfFilename, NexmarkConfiguration configuration,
+      NexmarkPerf perf) {
+    if (perfFilename == null) {
+      return;
+    }
+    List<String> lines = new ArrayList<>();
+    lines.add("");
+    lines.add(String.format("# %s", Instant.now()));
+    lines.add(String.format("# %s", configuration.toShortString()));
+    lines.add(configuration.toString());
+    lines.add(perf.toString());
+    try {
+      Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+          StandardOpenOption.APPEND);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to write perf file: ", e);
+    }
+    NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+  }
+
+  /**
+   * Load the baseline perf.
+   */
+  @Nullable
+  private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
+      @Nullable String baselineFilename) {
+    if (baselineFilename == null) {
+      return null;
+    }
+    Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
+    List<String> lines;
+    try {
+      lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to read baseline perf file: ", e);
+    }
+    for (int i = 0; i < lines.size(); i++) {
+      if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+        continue;
+      }
+      NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+      NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+      baseline.put(configuration, perf);
+    }
+    NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+        baselineFilename);
+    return baseline;
+  }
+
+  private static final String LINE =
+      "==========================================================================================";
+
+  /**
+   * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
+   *
+   * @throws IOException
+   */
+  private static void saveSummary(
+      @Nullable String summaryFilename,
+      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+    List<String> lines = new ArrayList<>();
+
+    lines.add("");
+    lines.add(LINE);
+
+    lines.add(
+        String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+    lines.add("");
+
+    lines.add("Default configuration:");
+    lines.add(NexmarkConfiguration.DEFAULT.toString());
+    lines.add("");
+
+    lines.add("Configurations:");
+    lines.add("  Conf  Description");
+    int conf = 0;
+    for (NexmarkConfiguration configuration : configurations) {
+      lines.add(String.format("  %04d  %s", conf++, configuration.toShortString()));
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf != null && actualPerf.jobId != null) {
+        lines.add(String.format("  %4s  [Ran as job %s]", "", actualPerf.jobId));
+      }
+    }
+
+    lines.add("");
+    lines.add("Performance:");
+    lines.add(String.format("  %4s  %12s  %12s  %12s  %12s  %12s  %12s", "Conf", "Runtime(sec)",
+        "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+    conf = 0;
+    for (NexmarkConfiguration configuration : configurations) {
+      String line = String.format("  %04d  ", conf++);
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf == null) {
+        line += "*** not run ***";
+      } else {
+        NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+        double runtimeSec = actualPerf.runtimeSec;
+        line += String.format("%12.1f  ", runtimeSec);
+        if (baselinePerf == null) {
+          line += String.format("%12s  ", "");
+        } else {
+          double baselineRuntimeSec = baselinePerf.runtimeSec;
+          double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+          line += String.format("%+11.2f%%  ", diff);
+        }
+
+        double eventsPerSec = actualPerf.eventsPerSec;
+        line += String.format("%12.1f  ", eventsPerSec);
+        if (baselinePerf == null) {
+          line += String.format("%12s  ", "");
+        } else {
+          double baselineEventsPerSec = baselinePerf.eventsPerSec;
+          double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+          line += String.format("%+11.2f%%  ", diff);
+        }
+
+        long numResults = actualPerf.numResults;
+        line += String.format("%12d  ", numResults);
+        if (baselinePerf == null) {
+          line += String.format("%12s", "");
+        } else {
+          long baselineNumResults = baselinePerf.numResults;
+          long diff = numResults - baselineNumResults;
+          line += String.format("%+12d", diff);
+        }
+      }
+      lines.add(line);
+
+      if (actualPerf != null) {
+        List<String> errors = actualPerf.errors;
+        if (errors == null) {
+          errors = new ArrayList<String>();
+          errors.add("NexmarkGoogleRunner returned null errors list");
+        }
+        for (String error : errors) {
+          lines.add(String.format("  %4s  *** %s ***", "", error));
+        }
+      }
+    }
+
+    lines.add(LINE);
+    lines.add("");
+
+    for (String line : lines) {
+      System.out.println(line);
+    }
+
+    if (summaryFilename != null) {
+      try {
+        Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+            StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to save summary file: ", e);
+      }
+      NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+    }
+  }
+
+  /**
+   * Write all perf data and any baselines to a javascript file which can be used by
+   * graphing page etc.
+   */
+  private static void saveJavascript(
+      @Nullable String javascriptFilename,
+      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+    if (javascriptFilename == null) {
+      return;
+    }
+
+    List<String> lines = new ArrayList<>();
+    lines.add(String.format(
+        "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+    lines.add("var all = [");
+
+    for (NexmarkConfiguration configuration : configurations) {
+      lines.add("  {");
+      lines.add(String.format("    config: %s", configuration));
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf != null) {
+        lines.add(String.format("    ,perf: %s", actualPerf));
+      }
+      NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+      if (baselinePerf != null) {
+        lines.add(String.format("    ,baseline: %s", baselinePerf));
+      }
+      lines.add("  },");
+    }
+
+    lines.add("];");
+
+    try {
+      Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+          StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to save javascript file: ", e);
+    }
+    NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
new file mode 100644
index 0000000..0029a36
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using Beam-on-Flink runner.
+ */
+public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkFlinkOptions extends Options, FlinkPipelineOptions {
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    // Gather command line args, baseline, configurations, etc.
+    NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args)
+                                                        .withValidation()
+                                                        .as(NexmarkFlinkOptions.class);
+    options.setRunner(FlinkPipelineRunner.class);
+    NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options);
+    new NexmarkFlinkDriver().runAll(options, runner);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
new file mode 100644
index 0000000..569aef6
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import javax.annotation.Nullable;
+
+/**
+ * Run a specific Nexmark query using the Bean-on-Flink runner.
+ */
+public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> {
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    return 4;
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return 5;
+  }
+
+  @Override
+  protected boolean canMonitor() {
+    return false;
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(
+      PipelineBuilder builder) {
+    builder.build(options);
+  }
+
+  @Override
+  protected void waitForPublisherPreload() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  @Nullable
+  protected NexmarkPerf monitor(NexmarkQuery query) {
+    return null;
+  }
+
+  public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
+    super(options);
+  }
+}