You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:23 UTC

[15/55] [abbrv] beam git commit: Refactor classes into packages

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java
deleted file mode 100644
index 7adb1b2..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
- * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
- * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
- *
- * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
- * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
- * (in microseconds). The event stream is thus fully deterministic and does not depend on
- * wallclock time.
- *
- * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
- * so that we can resume generating events from a saved snapshot.
- */
-public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
-  /**
-   * Keep the number of categories small so the example queries will find results even with
-   * a small batch of events.
-   */
-  private static final int NUM_CATEGORIES = 5;
-
-  /** Smallest random string size. */
-  private static final int MIN_STRING_LENGTH = 3;
-
-  /**
-   * Keep the number of states small so that the example queries will find results even with
-   * a small batch of events.
-   */
-  private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
-
-  private static final List<String> US_CITIES =
-      Arrays.asList(
-          ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
-              .split(","));
-
-  private static final List<String> FIRST_NAMES =
-      Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
-
-  private static final List<String> LAST_NAMES =
-      Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
-
-  /**
-   * Number of yet-to-be-created people and auction ids allowed.
-   */
-  private static final int PERSON_ID_LEAD = 10;
-  private static final int AUCTION_ID_LEAD = 10;
-
-  /**
-   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
-   * over these values.
-   */
-  private static final int HOT_AUCTION_RATIO = 100;
-  private static final int HOT_SELLER_RATIO = 100;
-  private static final int HOT_BIDDER_RATIO = 100;
-
-  /**
-   * Just enough state to be able to restore a generator back to where it was checkpointed.
-   */
-  public static class Checkpoint implements UnboundedSource.CheckpointMark {
-    private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-    /** Coder for this class. */
-    public static final Coder<Checkpoint> CODER_INSTANCE =
-        new AtomicCoder<Checkpoint>() {
-          @Override
-          public void encode(
-              Checkpoint value,
-              OutputStream outStream,
-              Coder.Context context)
-              throws CoderException, IOException {
-            LONG_CODER.encode(value.numEvents, outStream, Context.NESTED);
-            LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED);
-          }
-
-          @Override
-          public Checkpoint decode(
-              InputStream inStream, Coder.Context context)
-              throws CoderException, IOException {
-            long numEvents = LONG_CODER.decode(inStream, Context.NESTED);
-            long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
-            return new Checkpoint(numEvents, wallclockBaseTime);
-          }
-        };
-
-    private long numEvents;
-    private long wallclockBaseTime;
-
-    private Checkpoint(long numEvents, long wallclockBaseTime) {
-      this.numEvents = numEvents;
-      this.wallclockBaseTime = wallclockBaseTime;
-    }
-
-    public Generator toGenerator(GeneratorConfig config) {
-      return new Generator(config, numEvents, wallclockBaseTime);
-    }
-
-    @Override
-    public void finalizeCheckpoint() throws IOException {
-      // Nothing to finalize.
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
-          numEvents, wallclockBaseTime);
-    }
-  }
-
-  /**
-   * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
-   * (arbitrary but stable) event hash order.
-   */
-  public static class NextEvent implements Comparable<NextEvent> {
-    /** When, in wallclock time, should this event be emitted? */
-    public final long wallclockTimestamp;
-
-    /** When, in event time, should this event be considered to have occured? */
-    public final long eventTimestamp;
-
-    /** The event itself. */
-    public final Event event;
-
-    /** The minimum of this and all future event timestamps. */
-    public final long watermark;
-
-    public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
-      this.wallclockTimestamp = wallclockTimestamp;
-      this.eventTimestamp = eventTimestamp;
-      this.event = event;
-      this.watermark = watermark;
-    }
-
-    /**
-     * Return a deep clone of next event with delay added to wallclock timestamp and
-     * event annotate as 'LATE'.
-     */
-    public NextEvent withDelay(long delayMs) {
-      return new NextEvent(
-          wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
-    }
-
-    @Override
-    public int compareTo(NextEvent other) {
-      int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
-      if (i != 0) {
-        return i;
-      }
-      return Integer.compare(event.hashCode(), other.event.hashCode());
-    }
-  }
-
-  /**
-   * Configuration to generate events against. Note that it may be replaced by a call to
-   * {@link #splitAtEventId}.
-   */
-  private GeneratorConfig config;
-
-  /** Number of events generated by this generator. */
-  private long numEvents;
-
-  /**
-   * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
-   */
-  private long wallclockBaseTime;
-
-  private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
-    checkNotNull(config);
-    this.config = config;
-    this.numEvents = numEvents;
-    this.wallclockBaseTime = wallclockBaseTime;
-  }
-
-  /**
-   * Create a fresh generator according to {@code config}.
-   */
-  public Generator(GeneratorConfig config) {
-    this(config, 0, -1);
-  }
-
-  /**
-   * Return a checkpoint for the current generator.
-   */
-  public Checkpoint toCheckpoint() {
-    return new Checkpoint(numEvents, wallclockBaseTime);
-  }
-
-  /**
-   * Return a deep clone of this generator.
-   */
-  @Override
-  public Generator clone() {
-    return new Generator(config.clone(), numEvents, wallclockBaseTime);
-  }
-
-  /**
-   * Return the current config for this generator. Note that configs may be replaced by {@link
-   * #splitAtEventId}.
-   */
-  public GeneratorConfig getCurrentConfig() {
-    return config;
-  }
-
-  /**
-   * Mutate this generator so that it will only generate events up to but not including
-   * {@code eventId}. Return a config to represent the events this generator will no longer yield.
-   * The generators will run in on a serial timeline.
-   */
-  public GeneratorConfig splitAtEventId(long eventId) {
-    long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
-    GeneratorConfig remainConfig = config.cloneWith(config.firstEventId,
-        config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
-    config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
-    return remainConfig;
-  }
-
-  /**
-   * Return the next 'event id'. Though events don't have ids we can simulate them to
-   * help with bookkeeping.
-   */
-  public long getNextEventId() {
-    return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
-  }
-
-  /**
-   * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
-   * due to generate a person.
-   */
-  private long lastBase0PersonId() {
-    long eventId = getNextEventId();
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset >= GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate an auction or bid.
-      // Go back to the last person generated in this epoch.
-      offset = GeneratorConfig.PERSON_PROPORTION - 1;
-    }
-    // About to generate a person.
-    return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
-  }
-
-  /**
-   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
-   * due to generate an auction.
-   */
-  private long lastBase0AuctionId() {
-    long eventId = getNextEventId();
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset < GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate a person.
-      // Go back to the last auction in the last epoch.
-      epoch--;
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
-      // About to generate a bid.
-      // Go back to the last auction generated in this epoch.
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else {
-      // About to generate an auction.
-      offset -= GeneratorConfig.PERSON_PROPORTION;
-    }
-    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
-  }
-
-  /** return a random US state. */
-  private static String nextUSState(Random random) {
-    return US_STATES.get(random.nextInt(US_STATES.size()));
-  }
-
-  /** Return a random US city. */
-  private static String nextUSCity(Random random) {
-    return US_CITIES.get(random.nextInt(US_CITIES.size()));
-  }
-
-  /** Return a random person name. */
-  private static String nextPersonName(Random random) {
-    return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
-        + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
-  }
-
-  /** Return a random string of up to {@code maxLength}. */
-  private static String nextString(Random random, int maxLength) {
-    int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
-    StringBuilder sb = new StringBuilder();
-    while (len-- > 0) {
-      if (random.nextInt(13) == 0) {
-        sb.append(' ');
-      } else {
-        sb.append((char) ('a' + random.nextInt(26)));
-      }
-    }
-    return sb.toString().trim();
-  }
-
-  /** Return a random string of exactly {@code length}. */
-  private static String nextExactString(Random random, int length) {
-    StringBuilder sb = new StringBuilder();
-    while (length-- > 0) {
-      sb.append((char) ('a' + random.nextInt(26)));
-    }
-    return sb.toString();
-  }
-
-  /** Return a random email address. */
-  private static String nextEmail(Random random) {
-    return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
-  }
-
-  /** Return a random credit card number. */
-  private static String nextCreditCard(Random random) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < 4; i++) {
-      if (i > 0) {
-        sb.append(' ');
-      }
-      sb.append(String.format("%04d", random.nextInt(10000)));
-    }
-    return sb.toString();
-  }
-
-  /** Return a random price. */
-  private static long nextPrice(Random random) {
-    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
-  }
-
-  /** Return a random time delay, in milliseconds, for length of auctions. */
-  private long nextAuctionLengthMs(Random random, long timestamp) {
-    // What's our current event number?
-    long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
-    // How many events till we've generated numInFlightAuctions?
-    long numEventsForAuctions =
-        (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
-        / GeneratorConfig.AUCTION_PROPORTION;
-    // When will the auction numInFlightAuctions beyond now be generated?
-    long futureAuction =
-        config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
-            .getKey();
-    // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
-    //     futureAuction - timestamp, numEventsForAuctions);
-    // Choose a length with average horizonMs.
-    long horizonMs = futureAuction - timestamp;
-    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
-  }
-
-  /**
-   * Return a random {@code string} such that {@code currentSize + string.length()} is on average
-   * {@code averageSize}.
-   */
-  private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
-    if (currentSize > desiredAverageSize) {
-      return "";
-    }
-    desiredAverageSize -= currentSize;
-    int delta = (int) Math.round(desiredAverageSize * 0.2);
-    int minSize = desiredAverageSize - delta;
-    int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
-    return nextExactString(random, desiredSize);
-  }
-
-  /** Return a random long from {@code [0, n)}. */
-  private static long nextLong(Random random, long n) {
-    if (n < Integer.MAX_VALUE) {
-      return random.nextInt((int) n);
-    } else {
-      // TODO: Very skewed distribution! Bad!
-      return Math.abs(random.nextLong()) % n;
-    }
-  }
-
-  /**
-   * Generate and return a random person with next available id.
-   */
-  private Person nextPerson(Random random, long timestamp) {
-    long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
-    String name = nextPersonName(random);
-    String email = nextEmail(random);
-    String creditCard = nextCreditCard(random);
-    String city = nextUSCity(random);
-    String state = nextUSState(random);
-    int currentSize =
-        8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
-    String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
-    return new Person(id, name, email, creditCard, city, state, timestamp, extra);
-  }
-
-  /**
-   * Return a random person id (base 0).
-   */
-  private long nextBase0PersonId(Random random) {
-    // Choose a random person from any of the 'active' people, plus a few 'leads'.
-    // By limiting to 'active' we ensure the density of bids or auctions per person
-    // does not decrease over time for long running jobs.
-    // By choosing a person id ahead of the last valid person id we will make
-    // newPerson and newAuction events appear to have been swapped in time.
-    long numPeople = lastBase0PersonId() + 1;
-    long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
-    long n = nextLong(random, activePeople + PERSON_ID_LEAD);
-    return numPeople - activePeople + n;
-  }
-
-  /**
-   * Return a random auction id (base 0).
-   */
-  private long nextBase0AuctionId(Random random) {
-    // Choose a random auction for any of those which are likely to still be in flight,
-    // plus a few 'leads'.
-    // Note that ideally we'd track non-expired auctions exactly, but that state
-    // is difficult to split.
-    long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
-    long maxAuction = lastBase0AuctionId();
-    return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
-  }
-
-  /**
-   * Generate and return a random auction with next available id.
-   */
-  private Auction nextAuction(Random random, long timestamp) {
-    long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
-
-    long seller;
-    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
-    if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
-      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
-      seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
-    } else {
-      seller = nextBase0PersonId(random);
-    }
-    seller += GeneratorConfig.FIRST_PERSON_ID;
-
-    long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
-    long initialBid = nextPrice(random);
-    long dateTime = timestamp;
-    long expires = timestamp + nextAuctionLengthMs(random, timestamp);
-    String name = nextString(random, 20);
-    String desc = nextString(random, 100);
-    long reserve = initialBid + nextPrice(random);
-    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
-    return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category,
-        extra);
-  }
-
-  /**
-   * Generate and return a random bid with next available id.
-   */
-  private Bid nextBid(Random random, long timestamp) {
-    long auction;
-    // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
-    if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
-      // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
-      auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
-    } else {
-      auction = nextBase0AuctionId(random);
-    }
-    auction += GeneratorConfig.FIRST_AUCTION_ID;
-
-    long bidder;
-    // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
-    if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
-      // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
-      // last HOT_BIDDER_RATIO people.
-      bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
-    } else {
-      bidder = nextBase0PersonId(random);
-    }
-    bidder += GeneratorConfig.FIRST_PERSON_ID;
-
-    long price = nextPrice(random);
-    int currentSize = 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
-    return new Bid(auction, bidder, price, timestamp, extra);
-  }
-
-  @Override
-  public boolean hasNext() {
-    return numEvents < config.maxEvents;
-  }
-
-  /**
-   * Return the next event. The outer timestamp is in wallclock time and corresponds to
-   * when the event should fire. The inner timestamp is in event-time and represents the
-   * time the event is purported to have taken place in the simulation.
-   */
-  public NextEvent nextEvent() {
-    if (wallclockBaseTime < 0) {
-      wallclockBaseTime = System.currentTimeMillis();
-    }
-    // When, in event time, we should generate the event. Monotonic.
-    long eventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
-    // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
-    // may have local jitter.
-    long adjustedEventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
-            .getKey();
-    // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
-    // the event timestamp.
-    long watermark =
-        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
-            .getKey();
-    // When, in wallclock time, we should emit the event.
-    long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
-
-    // Seed the random number generator with the next 'event id'.
-    Random random = new Random(getNextEventId());
-    long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
-
-    Event event;
-    if (rem < GeneratorConfig.PERSON_PROPORTION) {
-      event = new Event(nextPerson(random, adjustedEventTimestamp));
-    } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
-      event = new Event(nextAuction(random, adjustedEventTimestamp));
-    } else {
-      event = new Event(nextBid(random, adjustedEventTimestamp));
-    }
-
-    numEvents++;
-    return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
-  }
-
-  @Override
-  public TimestampedValue<Event> next() {
-    NextEvent next = nextEvent();
-    return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Return how many microseconds till we emit the next event.
-   */
-  public long currentInterEventDelayUs() {
-    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
-        .getValue();
-  }
-
-  /**
-   * Return an estimate of fraction of output consumed.
-   */
-  public double getFractionConsumed() {
-    return (double) numEvents / config.maxEvents;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
-        numEvents, wallclockBaseTime);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
deleted file mode 100644
index dceff4f..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
- */
-class GeneratorConfig implements Serializable {
-  /**
-   * We start the ids at specific values to help ensure the queries find a match even on
-   * small synthesized dataset sizes.
-   */
-  public static final long FIRST_AUCTION_ID = 1000L;
-  public static final long FIRST_PERSON_ID = 1000L;
-  public static final long FIRST_CATEGORY_ID = 10L;
-
-  /**
-   * Proportions of people/auctions/bids to synthesize.
-   */
-  public static final int PERSON_PROPORTION = 1;
-  public static final int AUCTION_PROPORTION = 3;
-  public static final int BID_PROPORTION = 46;
-  public static final int PROPORTION_DENOMINATOR =
-      PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
-
-  /**
-   * Environment options.
-   */
-  public final NexmarkConfiguration configuration;
-
-  /**
-   * Delay between events, in microseconds. If the array has more than one entry then
-   * the rate is changed every {@link #stepLengthSec}, and wraps around.
-   */
-  public final long[] interEventDelayUs;
-
-  /**
-   * Delay before changing the current inter-event delay.
-   */
-  public final long stepLengthSec;
-
-  /**
-   * Time for first event (ms since epoch).
-   */
-  public final long baseTime;
-
-  /**
-   * Event id of first event to be generated. Event ids are unique over all generators, and
-   * are used as a seed to generate each event's data.
-   */
-  public final long firstEventId;
-
-  /**
-   * Maximum number of events to generate.
-   */
-  public final long maxEvents;
-
-  /**
-   * First event number. Generators running in parallel time may share the same event number,
-   * and the event number is used to determine the event timestamp.
-   */
-  public final long firstEventNumber;
-
-  /**
-   * True period of epoch in milliseconds. Derived from above.
-   * (Ie time to run through cycle for all interEventDelayUs entries).
-   */
-  public final long epochPeriodMs;
-
-  /**
-   * Number of events per epoch. Derived from above.
-   * (Ie number of events to run through cycle for all interEventDelayUs entries).
-   */
-  public final long eventsPerEpoch;
-
-  public GeneratorConfig(
-      NexmarkConfiguration configuration, long baseTime, long firstEventId,
-      long maxEventsOrZero, long firstEventNumber) {
-    this.configuration = configuration;
-    this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
-        configuration.firstEventRate, configuration.nextEventRate,
-        configuration.rateUnit, configuration.numEventGenerators);
-    this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
-    this.baseTime = baseTime;
-    this.firstEventId = firstEventId;
-    if (maxEventsOrZero == 0) {
-      // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
-      this.maxEvents =
-          Long.MAX_VALUE / (PROPORTION_DENOMINATOR
-                            * Math.max(
-              Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
-              configuration.avgBidByteSize));
-    } else {
-      this.maxEvents = maxEventsOrZero;
-    }
-    this.firstEventNumber = firstEventNumber;
-
-    long eventsPerEpoch = 0;
-    long epochPeriodMs = 0;
-    if (interEventDelayUs.length > 1) {
-      for (int i = 0; i < interEventDelayUs.length; i++) {
-        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
-        eventsPerEpoch += numEventsForThisCycle;
-        epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
-      }
-    }
-    this.eventsPerEpoch = eventsPerEpoch;
-    this.epochPeriodMs = epochPeriodMs;
-  }
-
-  /**
-   * Return a clone of this config.
-   */
-  @Override
-  public GeneratorConfig clone() {
-    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
-  }
-
-  /**
-   * Return clone of this config except with given parameters.
-   */
-  public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) {
-    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
-  }
-
-  /**
-   * Split this config into {@code n} sub-configs with roughly equal number of
-   * possible events, but distinct value spaces. The generators will run on parallel timelines.
-   * This config should no longer be used.
-   */
-  public List<GeneratorConfig> split(int n) {
-    List<GeneratorConfig> results = new ArrayList<>();
-    if (n == 1) {
-      // No split required.
-      results.add(this);
-    } else {
-      long subMaxEvents = maxEvents / n;
-      long subFirstEventId = firstEventId;
-      for (int i = 0; i < n; i++) {
-        if (i == n - 1) {
-          // Don't loose any events to round-down.
-          subMaxEvents = maxEvents - subMaxEvents * (n - 1);
-        }
-        results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber));
-        subFirstEventId += subMaxEvents;
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Return an estimate of the bytes needed by {@code numEvents}.
-   */
-  public long estimatedBytesForEvents(long numEvents) {
-    long numPersons =
-        (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
-    long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
-    return numPersons * configuration.avgPersonByteSize
-           + numAuctions * configuration.avgAuctionByteSize
-           + numBids * configuration.avgBidByteSize;
-  }
-
-  /**
-   * Return an estimate of the byte-size of all events a generator for this config would yield.
-   */
-  public long getEstimatedSizeBytes() {
-    return estimatedBytesForEvents(maxEvents);
-  }
-
-  /**
-   * Return the first 'event id' which could be generated from this config. Though events don't
-   * have ids we can simulate them to help bookkeeping.
-   */
-  public long getStartEventId() {
-    return firstEventId + firstEventNumber;
-  }
-
-  /**
-   * Return one past the last 'event id' which could be generated from this config.
-   */
-  public long getStopEventId() {
-    return firstEventId + firstEventNumber + maxEvents;
-  }
-
-  /**
-   * Return the next event number for a generator which has so far emitted {@code numEvents}.
-   */
-  public long nextEventNumber(long numEvents) {
-    return firstEventNumber + numEvents;
-  }
-
-  /**
-   * Return the next event number for a generator which has so far emitted {@code numEvents},
-   * but adjusted to account for {@code outOfOrderGroupSize}.
-   */
-  public long nextAdjustedEventNumber(long numEvents) {
-    long n = configuration.outOfOrderGroupSize;
-    long eventNumber = nextEventNumber(numEvents);
-    long base = (eventNumber / n) * n;
-    long offset = (eventNumber * 953) % n;
-    return base + offset;
-  }
-
-  /**
-   * Return the event number who's event time will be a suitable watermark for
-   * a generator which has so far emitted {@code numEvents}.
-   */
-  public long nextEventNumberForWatermark(long numEvents) {
-    long n = configuration.outOfOrderGroupSize;
-    long eventNumber = nextEventNumber(numEvents);
-    return (eventNumber / n) * n;
-  }
-
-  /**
-   * What timestamp should the event with {@code eventNumber} have for this generator? And
-   * what inter-event delay (in microseconds) is current?
-   */
-  public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
-    if (interEventDelayUs.length == 1) {
-      long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
-      return KV.of(timestamp, interEventDelayUs[0]);
-    }
-
-    long epoch = eventNumber / eventsPerEpoch;
-    long n = eventNumber % eventsPerEpoch;
-    long offsetInEpochMs = 0;
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
-      if (n < numEventsForThisCycle) {
-        long offsetInCycleUs = n * interEventDelayUs[i];
-        long timestamp =
-            baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
-        return KV.of(timestamp, interEventDelayUs[i]);
-      }
-      n -= numEventsForThisCycle;
-      offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
-    }
-    throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("GeneratorConfig");
-    sb.append("{configuration:");
-    sb.append(configuration.toString());
-    sb.append(";interEventDelayUs=[");
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      if (i > 0) {
-        sb.append(",");
-      }
-      sb.append(interEventDelayUs[i]);
-    }
-    sb.append("]");
-    sb.append(";stepLengthSec:");
-    sb.append(stepLengthSec);
-    sb.append(";baseTime:");
-    sb.append(baseTime);
-    sb.append(";firstEventId:");
-    sb.append(firstEventId);
-    sb.append(";maxEvents:");
-    sb.append(maxEvents);
-    sb.append(";firstEventNumber:");
-    sb.append(firstEventNumber);
-    sb.append(";epochPeriodMs:");
-    sb.append(epochPeriodMs);
-    sb.append(";eventsPerEpoch:");
-    sb.append(eventsPerEpoch);
-    sb.append("}");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
deleted file mode 100644
index 21fa3f4..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result type of {@link Query8}.
- */
-public class IdNameReserve implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
-    @Override
-    public void encode(IdNameReserve value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-      STRING_CODER.encode(value.name, outStream, Context.NESTED);
-      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
-    }
-
-    @Override
-    public IdNameReserve decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      String name = STRING_CODER.decode(inStream, Context.NESTED);
-      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
-      return new IdNameReserve(id, name, reserve);
-    }
-  };
-
-  @JsonProperty
-  public final long id;
-
-  @JsonProperty
-  public final String name;
-
-  /** Reserve price in cents. */
-  @JsonProperty
-  public final long reserve;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private IdNameReserve() {
-    id = 0;
-    name = null;
-    reserve = 0;
-  }
-
-  public IdNameReserve(long id, String name, long reserve) {
-    this.id = id;
-    this.name = name;
-    this.reserve = reserve;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + name.length() + 1 + 8;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
deleted file mode 100644
index 2093c48..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Interface for elements which can quickly estimate their encoded byte size.
- */
-public interface KnownSize {
-  long sizeInBytes();
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
index 02660bf..6370e41 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -19,6 +19,7 @@ package org.apache.beam.integration.nexmark;
 
 import java.io.Serializable;
 
+import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
deleted file mode 100644
index fe4687b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query3}.
- */
-public class NameCityStateId implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
-    @Override
-    public void encode(NameCityStateId value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      STRING_CODER.encode(value.name, outStream, Context.NESTED);
-      STRING_CODER.encode(value.city, outStream, Context.NESTED);
-      STRING_CODER.encode(value.state, outStream, Context.NESTED);
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-    }
-
-    @Override
-    public NameCityStateId decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      String name = STRING_CODER.decode(inStream, Context.NESTED);
-      String city = STRING_CODER.decode(inStream, Context.NESTED);
-      String state = STRING_CODER.decode(inStream, Context.NESTED);
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      return new NameCityStateId(name, city, state, id);
-    }
-  };
-
-  @JsonProperty
-  public final String name;
-
-  @JsonProperty
-  public final String city;
-
-  @JsonProperty
-  public final String state;
-
-  @JsonProperty
-  public final long id;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private NameCityStateId() {
-    name = null;
-    city = null;
-    state = null;
-    id = 0;
-  }
-
-  public NameCityStateId(String name, String city, String state, long id) {
-    this.name = name;
-    this.city = city;
-    this.state = state;
-    this.id = id;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java
deleted file mode 100644
index 4c2721e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Apex runner.
- */
-public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkApexOptions extends Options, ApexPipelineOptions {
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    // Gather command line args, baseline, configurations, etc.
-    NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args)
-                                                        .withValidation()
-                                                        .as(NexmarkApexOptions.class);
-    options.setRunner(ApexRunner.class);
-    NexmarkApexRunner runner = new NexmarkApexRunner(options);
-    new NexmarkApexDriver().runAll(options, runner);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
deleted file mode 100644
index 3b8993a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import javax.annotation.Nullable;
-
-/**
- * Run a query using the Apex runner.
- */
-public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> {
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return 5;
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(
-      PipelineBuilder builder) {
-    builder.build(options);
-  }
-
-  @Override
-  protected void waitForPublisherPreload() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  @Nullable
-  protected NexmarkPerf monitor(NexmarkQuery query) {
-    return null;
-  }
-
-  public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) {
-    super(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index 0943664..e2890ed 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -29,7 +29,7 @@ import java.util.Objects;
  * programmatically. We only capture properties which may influence the resulting
  * pipeline performance, as captured by {@link NexmarkPerf}.
  */
-class NexmarkConfiguration implements Serializable {
+public class NexmarkConfiguration implements Serializable {
   public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
 
   /** If {@literal true}, include additional debugging and monitoring stats. */
@@ -228,7 +228,7 @@ class NexmarkConfiguration implements Serializable {
   /**
    * Replace any properties of this configuration which have been supplied by the command line.
    */
-  public void overrideFromOptions(Options options) {
+  public void overrideFromOptions(NexmarkOptions options) {
     if (options.getDebug() != null) {
       debug = options.getDebug();
     }
@@ -511,8 +511,6 @@ class NexmarkConfiguration implements Serializable {
 
   /**
    * Parse an object from {@code string}.
-   *
-   * @throws IOException
    */
   public static NexmarkConfiguration fromString(String string) {
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java
deleted file mode 100644
index 24fcc01..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.direct.DirectOptions;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * An implementation of the 'NEXMark queries' using the Direct Runner.
- */
-class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkDirectOptions extends Options, DirectOptions {
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    NexmarkDirectOptions options =
-        PipelineOptionsFactory.fromArgs(args)
-                              .withValidation()
-                              .as(NexmarkDirectOptions.class);
-    options.setRunner(DirectRunner.class);
-    NexmarkDirectRunner runner = new NexmarkDirectRunner(options);
-    new NexmarkDirectDriver().runAll(options, runner);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
deleted file mode 100644
index 0119bbc..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Run a single query using the Direct Runner.
- */
-class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> {
-  public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) {
-    super(options);
-  }
-
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return 1;
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
-    throw new UnsupportedOperationException(
-        "Cannot use --pubSubMode=COMBINED with DirectRunner");
-  }
-
-  /**
-   * Monitor the progress of the publisher job. Return when it has been generating events for
-   * at least {@code configuration.preloadSeconds}.
-   */
-  @Override
-  protected void waitForPublisherPreload() {
-    throw new UnsupportedOperationException(
-        "Cannot use --pubSubMode=COMBINED with DirectRunner");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
index e6a7b0b..4714124 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
@@ -28,6 +28,9 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Person;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -48,7 +51,7 @@ import org.joda.time.Instant;
  * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
  * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
  */
-public class NexmarkDriver<OptionT extends Options> {
+public class NexmarkDriver<OptionT extends NexmarkOptions> {
 
   /**
    * Entry point.

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
deleted file mode 100644
index 61a7d29..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Flink runner.
- */
-public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkFlinkOptions extends Options, FlinkPipelineOptions {
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    // Gather command line args, baseline, configurations, etc.
-    NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args)
-                                                        .withValidation()
-                                                        .as(NexmarkFlinkOptions.class);
-    options.setRunner(FlinkRunner.class);
-    NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options);
-    new NexmarkFlinkDriver().runAll(options, runner);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
deleted file mode 100644
index 95ab1ad..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Run a query using the Flink runner.
- */
-public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> {
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return 5;
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(
-      PipelineBuilder builder) {
-    builder.build(options);
-  }
-
-  @Override
-  protected void waitForPublisherPreload() {
-    throw new UnsupportedOperationException();
-  }
-
-  public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
-    super(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
deleted file mode 100644
index 50c2a7c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * An implementation of the 'NEXMark queries' for Google Dataflow.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of streaming dataflow.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkGoogleOptions extends Options, DataflowPipelineOptions {
-
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    // Gather command line args, baseline, configurations, etc.
-    NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
-                                                         .withValidation()
-                                                         .as(NexmarkGoogleOptions.class);
-    options.setRunner(DataflowRunner.class);
-    NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
-    new NexmarkGoogleDriver().runAll(options, runner);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
deleted file mode 100644
index f4bfb1e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.sdk.PipelineResult;
-import org.joda.time.Duration;
-
-/**
- * Run a singe Nexmark query using a given configuration on Google Dataflow.
- */
-class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> {
-
-  public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
-    super(options);
-  }
-
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    String machineType = options.getWorkerMachineType();
-    if (machineType == null || machineType.isEmpty()) {
-      return 1;
-    }
-    String[] split = machineType.split("-");
-    if (split.length != 3) {
-      return 1;
-    }
-    try {
-      return Integer.parseInt(split[2]);
-    } catch (NumberFormatException ex) {
-      return 1;
-    }
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
-  }
-
-  @Override
-  protected String getJobId(PipelineResult job) {
-    return ((DataflowPipelineJob) job).getJobId();
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
-    String jobName = options.getJobName();
-    String appName = options.getAppName();
-    options.setJobName("p-" + jobName);
-    options.setAppName("p-" + appName);
-    int coresPerWorker = coresPerWorker();
-    int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
-                                / coresPerWorker;
-    options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
-    options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
-    publisherMonitor = new Monitor<Event>(queryName, "publisher");
-    try {
-      builder.build(options);
-    } finally {
-      options.setJobName(jobName);
-      options.setAppName(appName);
-      options.setMaxNumWorkers(options.getMaxNumWorkers());
-      options.setNumWorkers(options.getNumWorkers());
-    }
-  }
-
-  /**
-   * Monitor the progress of the publisher job. Return when it has been generating events for
-   * at least {@code configuration.preloadSeconds}.
-   */
-  @Override
-  protected void waitForPublisherPreload() {
-    checkNotNull(publisherMonitor);
-    checkNotNull(publisherResult);
-    if (!options.getMonitorJobs()) {
-      return;
-    }
-    if (!(publisherResult instanceof DataflowPipelineJob)) {
-      return;
-    }
-    if (configuration.preloadSeconds <= 0) {
-      return;
-    }
-
-    NexmarkUtils.console("waiting for publisher to pre-load");
-
-    DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
-
-    long numEvents = 0;
-    long startMsSinceEpoch = -1;
-    long endMsSinceEpoch = -1;
-    while (true) {
-      PipelineResult.State state = job.getState();
-      switch (state) {
-        case UNKNOWN:
-          // Keep waiting.
-          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
-          break;
-        case STOPPED:
-        case DONE:
-        case CANCELLED:
-        case FAILED:
-        case UPDATED:
-          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
-          return;
-        case RUNNING:
-          numEvents = getLong(job, publisherMonitor.getElementCounter());
-          if (startMsSinceEpoch < 0 && numEvents > 0) {
-            startMsSinceEpoch = System.currentTimeMillis();
-            endMsSinceEpoch = startMsSinceEpoch
-                              + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
-          }
-          if (endMsSinceEpoch < 0) {
-            NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
-          } else {
-            long remainMs = endMsSinceEpoch - System.currentTimeMillis();
-            if (remainMs > 0) {
-              NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
-                  remainMs / 1000);
-            } else {
-              NexmarkUtils.console("publisher preloaded %d events", numEvents);
-              return;
-            }
-          }
-          break;
-      }
-
-      try {
-        Thread.sleep(PERF_DELAY.getMillis());
-      } catch (InterruptedException e) {
-        Thread.interrupted();
-        throw new RuntimeException("Interrupted: publisher still running.");
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
new file mode 100644
index 0000000..1be974f
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * Command line flags.
+ */
+public interface NexmarkOptions extends PubsubOptions {
+  @Description("Which suite to run. Default is to use command line arguments for one job.")
+  @Default.Enum("DEFAULT")
+  NexmarkSuite getSuite();
+
+  void setSuite(NexmarkSuite suite);
+
+  @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.")
+  @Default.Boolean(false)
+  boolean getMonitorJobs();
+
+  void setMonitorJobs(boolean monitorJobs);
+
+  @Description("Where the events come from.")
+  @Nullable
+  NexmarkUtils.SourceType getSourceType();
+
+  void setSourceType(NexmarkUtils.SourceType sourceType);
+
+  @Description("Prefix for input files if using avro input")
+  @Nullable
+  String getInputPath();
+
+  void setInputPath(String inputPath);
+
+  @Description("Where results go.")
+  @Nullable
+  NexmarkUtils.SinkType getSinkType();
+
+  void setSinkType(NexmarkUtils.SinkType sinkType);
+
+  @Description("Which mode to run in when source is PUBSUB.")
+  @Nullable
+  NexmarkUtils.PubSubMode getPubSubMode();
+
+  void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
+
+  @Description("Which query to run.")
+  @Nullable
+  Integer getQuery();
+
+  void setQuery(Integer query);
+
+  @Description("Prefix for output files if using text output for results or running Query 10.")
+  @Nullable
+  String getOutputPath();
+
+  void setOutputPath(String outputPath);
+
+  @Description("Base name of pubsub topic to publish to in streaming mode.")
+  @Nullable
+  @Default.String("nexmark")
+  String getPubsubTopic();
+
+  void setPubsubTopic(String pubsubTopic);
+
+  @Description("Base name of pubsub subscription to read from in streaming mode.")
+  @Nullable
+  @Default.String("nexmark")
+  String getPubsubSubscription();
+
+  void setPubsubSubscription(String pubsubSubscription);
+
+  @Description("Base name of BigQuery table name if using BigQuery output.")
+  @Nullable
+  @Default.String("nexmark")
+  String getBigQueryTable();
+
+  void setBigQueryTable(String bigQueryTable);
+
+  @Description("Approximate number of events to generate. "
+               + "Zero for effectively unlimited in streaming mode.")
+  @Nullable
+  Long getNumEvents();
+
+  void setNumEvents(Long numEvents);
+
+  @Description("Time in seconds to preload the subscription with data, at the initial input rate "
+               + "of the pipeline.")
+  @Nullable
+  Integer getPreloadSeconds();
+
+  void setPreloadSeconds(Integer preloadSeconds);
+
+  @Description("Number of unbounded sources to create events.")
+  @Nullable
+  Integer getNumEventGenerators();
+
+  void setNumEventGenerators(Integer numEventGenerators);
+
+  @Description("Shape of event rate curve.")
+  @Nullable
+  NexmarkUtils.RateShape getRateShape();
+
+  void setRateShape(NexmarkUtils.RateShape rateShape);
+
+  @Description("Initial overall event rate (in --rateUnit).")
+  @Nullable
+  Integer getFirstEventRate();
+
+  void setFirstEventRate(Integer firstEventRate);
+
+  @Description("Next overall event rate (in --rateUnit).")
+  @Nullable
+  Integer getNextEventRate();
+
+  void setNextEventRate(Integer nextEventRate);
+
+  @Description("Unit for rates.")
+  @Nullable
+  NexmarkUtils.RateUnit getRateUnit();
+
+  void setRateUnit(NexmarkUtils.RateUnit rateUnit);
+
+  @Description("Overall period of rate shape, in seconds.")
+  @Nullable
+  Integer getRatePeriodSec();
+
+  void setRatePeriodSec(Integer ratePeriodSec);
+
+  @Description("If true, relay events in real time in streaming mode.")
+  @Nullable
+  Boolean getIsRateLimited();
+
+  void setIsRateLimited(Boolean isRateLimited);
+
+  @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
+               + " time in the past so that multiple runs will see exactly the same event streams"
+               + " and should thus have exactly the same results.")
+  @Nullable
+  Boolean getUseWallclockEventTime();
+
+  void setUseWallclockEventTime(Boolean useWallclockEventTime);
+
+  @Description("Assert pipeline results match model results.")
+  @Nullable
+  boolean getAssertCorrectness();
+
+  void setAssertCorrectness(boolean assertCorrectness);
+
+  @Description("Log all input events.")
+  @Nullable
+  boolean getLogEvents();
+
+  void setLogEvents(boolean logEvents);
+
+  @Description("Log all query results.")
+  @Nullable
+  boolean getLogResults();
+
+  void setLogResults(boolean logResults);
+
+  @Description("Average size in bytes for a person record.")
+  @Nullable
+  Integer getAvgPersonByteSize();
+
+  void setAvgPersonByteSize(Integer avgPersonByteSize);
+
+  @Description("Average size in bytes for an auction record.")
+  @Nullable
+  Integer getAvgAuctionByteSize();
+
+  void setAvgAuctionByteSize(Integer avgAuctionByteSize);
+
+  @Description("Average size in bytes for a bid record.")
+  @Nullable
+  Integer getAvgBidByteSize();
+
+  void setAvgBidByteSize(Integer avgBidByteSize);
+
+  @Description("Ratio of bids for 'hot' auctions above the background.")
+  @Nullable
+  Integer getHotAuctionRatio();
+
+  void setHotAuctionRatio(Integer hotAuctionRatio);
+
+  @Description("Ratio of auctions for 'hot' sellers above the background.")
+  @Nullable
+  Integer getHotSellersRatio();
+
+  void setHotSellersRatio(Integer hotSellersRatio);
+
+  @Description("Ratio of auctions for 'hot' bidders above the background.")
+  @Nullable
+  Integer getHotBiddersRatio();
+
+  void setHotBiddersRatio(Integer hotBiddersRatio);
+
+  @Description("Window size in seconds.")
+  @Nullable
+  Long getWindowSizeSec();
+
+  void setWindowSizeSec(Long windowSizeSec);
+
+  @Description("Window period in seconds.")
+  @Nullable
+  Long getWindowPeriodSec();
+
+  void setWindowPeriodSec(Long windowPeriodSec);
+
+  @Description("If in streaming mode, the holdback for watermark in seconds.")
+  @Nullable
+  Long getWatermarkHoldbackSec();
+
+  void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
+
+  @Description("Roughly how many auctions should be in flight for each generator.")
+  @Nullable
+  Integer getNumInFlightAuctions();
+
+  void setNumInFlightAuctions(Integer numInFlightAuctions);
+
+
+  @Description("Maximum number of people to consider as active for placing auctions or bids.")
+  @Nullable
+  Integer getNumActivePeople();
+
+  void setNumActivePeople(Integer numActivePeople);
+
+  @Description("Filename of perf data to append to.")
+  @Nullable
+  String getPerfFilename();
+
+  void setPerfFilename(String perfFilename);
+
+  @Description("Filename of baseline perf data to read from.")
+  @Nullable
+  String getBaselineFilename();
+
+  void setBaselineFilename(String baselineFilename);
+
+  @Description("Filename of summary perf data to append to.")
+  @Nullable
+  String getSummaryFilename();
+
+  void setSummaryFilename(String summaryFilename);
+
+  @Description("Filename for javascript capturing all perf data and any baselines.")
+  @Nullable
+  String getJavascriptFilename();
+
+  void setJavascriptFilename(String javascriptFilename);
+
+  @Description("If true, don't run the actual query. Instead, calculate the distribution "
+               + "of number of query results per (event time) minute according to the query model.")
+  @Nullable
+  boolean getJustModelResultRate();
+
+  void setJustModelResultRate(boolean justModelResultRate);
+
+  @Description("Coder strategy to use.")
+  @Nullable
+  NexmarkUtils.CoderStrategy getCoderStrategy();
+
+  void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
+
+  @Description("Delay, in milliseconds, for each event. We will peg one core for this "
+               + "number of milliseconds to simulate CPU-bound computation.")
+  @Nullable
+  Long getCpuDelayMs();
+
+  void setCpuDelayMs(Long cpuDelayMs);
+
+  @Description("Extra data, in bytes, to save to persistent state for each event. "
+               + "This will force I/O all the way to durable storage to simulate an "
+               + "I/O-bound computation.")
+  @Nullable
+  Long getDiskBusyBytes();
+
+  void setDiskBusyBytes(Long diskBusyBytes);
+
+  @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
+  @Nullable
+  Integer getAuctionSkip();
+
+  void setAuctionSkip(Integer auctionSkip);
+
+  @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
+  @Nullable
+  Integer getFanout();
+
+  void setFanout(Integer fanout);
+
+  @Description("Length of occasional delay to impose on events (in seconds).")
+  @Nullable
+  Long getOccasionalDelaySec();
+
+  void setOccasionalDelaySec(Long occasionalDelaySec);
+
+  @Description("Probability that an event will be delayed by delayS.")
+  @Nullable
+  Double getProbDelayedEvent();
+
+  void setProbDelayedEvent(Double probDelayedEvent);
+
+  @Description("Maximum size of each log file (in events). For Query10 only.")
+  @Nullable
+  Integer getMaxLogEvents();
+
+  void setMaxLogEvents(Integer maxLogEvents);
+
+  @Description("How to derive names of resources.")
+  @Default.Enum("QUERY_AND_SALT")
+  NexmarkUtils.ResourceNameMode getResourceNameMode();
+
+  void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
+
+  @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
+  @Default.Boolean(true)
+  boolean getManageResources();
+
+  void setManageResources(boolean manageResources);
+
+  @Description("If true, use pub/sub publish time instead of event time.")
+  @Nullable
+  Boolean getUsePubsubPublishTime();
+
+  void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
+
+  @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
+               + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
+  @Nullable
+  Long getOutOfOrderGroupSize();
+
+  void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
+
+  @Description("If false, do not add the Monitor and Snoop transforms.")
+  @Nullable
+  Boolean getDebug();
+
+  void setDebug(Boolean value);
+
+  @Description("If set, cancel running pipelines after this long")
+  @Nullable
+  Long getRunningTimeMinutes();
+
+  void setRunningTimeMinutes(Long value);
+
+  @Description("If set and --monitorJobs is true, check that the system watermark is never more "
+               + "than this far behind real time")
+  @Nullable
+  Long getMaxSystemLagSeconds();
+
+  void setMaxSystemLagSeconds(Long value);
+
+  @Description("If set and --monitorJobs is true, check that the data watermark is never more "
+               + "than this far behind real time")
+  @Nullable
+  Long getMaxDataLagSeconds();
+
+  void setMaxDataLagSeconds(Long value);
+
+  @Description("Only start validating watermarks after this many seconds")
+  @Nullable
+  Long getWatermarkValidationDelaySeconds();
+
+  void setWatermarkValidationDelaySeconds(Long value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
index 37b6213..e7f59c8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
 /**
  * Summary of performance for a particular run of a configuration.
  */
-class NexmarkPerf {
+public class NexmarkPerf {
   /**
    * A sample of the number of events and number of results (if known) generated at
    * a particular time.
@@ -177,8 +177,6 @@ class NexmarkPerf {
 
   /**
    * Parse a {@link NexmarkPerf} object from JSON {@code string}.
-   *
-   * @throws IOException
    */
   public static NexmarkPerf fromString(String string) {
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
index 5ef4191..c268a3b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -18,7 +18,11 @@
 package org.apache.beam.integration.nexmark;
 
 import javax.annotation.Nullable;
-
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
@@ -29,7 +33,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
-
 import org.joda.time.Instant;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
index f265e0d..b2b1826 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
@@ -17,11 +17,6 @@
  */
 package org.apache.beam.integration.nexmark;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItems;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,10 +25,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.TimestampedValue;
 
-import org.hamcrest.core.IsCollectionContaining;
+
 import org.hamcrest.core.IsEqual;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -107,15 +103,18 @@ public abstract class NexmarkQueryModel implements Serializable {
   /** Return assertion to use on results of pipeline for this query. */
   public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
     final Collection<String> expectedStrings = toCollection(simulator().results());
-    final String[] expectedStringsArray = expectedStrings.toArray(new String[expectedStrings.size()]);
+    final String[] expectedStringsArray =
+      expectedStrings.toArray(new String[expectedStrings.size()]);
 
     return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
       @Override
       public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
-        Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
-                Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings));
+      Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
+        Assert.assertThat("wrong pipeline output", actualStrings,
+          IsEqual.equalTo(expectedStrings));
 //compare without order
-//        Assert.assertThat("wrong pipeline output", actualStrings, IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
+//      Assert.assertThat("wrong pipeline output", actualStrings,
+//        IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
         return null;
       }
     };