You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:51:46 UTC

[2/6] beam git commit: [Nexmark] Extract BidGenerator from Generator

[Nexmark] Extract BidGenerator from Generator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8a6fad9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8a6fad9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8a6fad9

Branch: refs/heads/master
Commit: d8a6fad9ed4b65504911fa9d5dadf5c8d4a7a0e6
Parents: e895fc8
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 15:19:39 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/nexmark/NexmarkUtils.java   |   4 +-
 .../beam/sdk/nexmark/queries/WinningBids.java   |   2 +-
 .../sdk/nexmark/sources/BoundedEventSource.java |   2 +
 .../beam/sdk/nexmark/sources/Generator.java     | 316 -----------------
 .../nexmark/sources/GeneratorCheckpoint.java    |  78 -----
 .../sdk/nexmark/sources/GeneratorConfig.java    | 339 -------------------
 .../nexmark/sources/UnboundedEventSource.java   |   3 +
 .../nexmark/sources/generator/Generator.java    | 271 +++++++++++++++
 .../sources/generator/GeneratorCheckpoint.java  |  82 +++++
 .../sources/generator/GeneratorConfig.java      | 339 +++++++++++++++++++
 .../generator/model/AuctionGenerator.java       | 142 ++++++++
 .../sources/generator/model/BidGenerator.java   |  76 +++++
 .../sources/generator/model/LongGenerator.java  |  37 ++
 .../generator/model/PersonGenerator.java        | 139 ++++++++
 .../sources/generator/model/PriceGenerator.java |  32 ++
 .../generator/model/StringsGenerator.java       |  68 ++++
 .../sources/generator/model/package-info.java   |  22 ++
 .../nexmark/sources/generator/package-info.java |  26 ++
 .../nexmark/sources/utils/AuctionGenerator.java | 145 --------
 .../nexmark/sources/utils/LongGenerator.java    |  37 --
 .../nexmark/sources/utils/PersonGenerator.java  | 140 --------
 .../nexmark/sources/utils/PriceGenerator.java   |  32 --
 .../nexmark/sources/utils/StringsGenerator.java |  68 ----
 .../sdk/nexmark/sources/utils/package-info.java |  22 --
 .../nexmark/sources/BoundedEventSourceTest.java |   1 +
 .../beam/sdk/nexmark/sources/GeneratorTest.java |   2 +
 .../sources/UnboundedEventSourceTest.java       |   3 +
 27 files changed, 1248 insertions(+), 1180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index fa1ef16..fc0ab9f 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -50,9 +50,9 @@ import org.apache.beam.sdk.nexmark.model.NameCityStateId;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.nexmark.model.SellerPrice;
 import org.apache.beam.sdk.nexmark.sources.BoundedEventSource;
-import org.apache.beam.sdk.nexmark.sources.Generator;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
 import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index bc553c9..3ee4f3a 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -44,7 +44,7 @@ import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
index 60124bb..cc32007 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
deleted file mode 100644
index 68e6748..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.lastBase0AuctionId;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextAuction;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextBase0AuctionId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson;
-import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Bid;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
- * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
- * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
- *
- * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
- * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
- * (in microseconds). The event stream is thus fully deterministic and does not depend on
- * wallclock time.
- *
- * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
- * so that we can resume generating events from a saved snapshot.
- */
-public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
-
-  /**
-   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
-   * over these values.
-   */
-  private static final int HOT_AUCTION_RATIO = 100;
-  private static final int HOT_BIDDER_RATIO = 100;
-
-  /**
-   * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
-   * (arbitrary but stable) event hash order.
-   */
-  public static class NextEvent implements Comparable<NextEvent> {
-    /** When, in wallclock time, should this event be emitted? */
-    public final long wallclockTimestamp;
-
-    /** When, in event time, should this event be considered to have occured? */
-    public final long eventTimestamp;
-
-    /** The event itself. */
-    public final Event event;
-
-    /** The minimum of this and all future event timestamps. */
-    public final long watermark;
-
-    public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
-      this.wallclockTimestamp = wallclockTimestamp;
-      this.eventTimestamp = eventTimestamp;
-      this.event = event;
-      this.watermark = watermark;
-    }
-
-    /**
-     * Return a deep copy of next event with delay added to wallclock timestamp and
-     * event annotate as 'LATE'.
-     */
-    public NextEvent withDelay(long delayMs) {
-      return new NextEvent(
-          wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
-    }
-
-    @Override public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      NextEvent nextEvent = (NextEvent) o;
-
-      return (wallclockTimestamp == nextEvent.wallclockTimestamp
-          && eventTimestamp == nextEvent.eventTimestamp
-          && watermark == nextEvent.watermark
-          && event.equals(nextEvent.event));
-    }
-
-    @Override public int hashCode() {
-      return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
-    }
-
-    @Override
-    public int compareTo(NextEvent other) {
-      int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
-      if (i != 0) {
-        return i;
-      }
-      return Integer.compare(event.hashCode(), other.event.hashCode());
-    }
-  }
-
-  /**
-   * Configuration to generate events against. Note that it may be replaced by a call to
-   * {@link #splitAtEventId}.
-   */
-  private GeneratorConfig config;
-
-  /** Number of events generated by this generator. */
-  private long eventsCountSoFar;
-
-  /**
-   * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
-   */
-  private long wallclockBaseTime;
-
-  Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
-    checkNotNull(config);
-    this.config = config;
-    this.eventsCountSoFar = eventsCountSoFar;
-    this.wallclockBaseTime = wallclockBaseTime;
-  }
-
-  /**
-   * Create a fresh generator according to {@code config}.
-   */
-  public Generator(GeneratorConfig config) {
-    this(config, 0, -1);
-  }
-
-  /**
-   * Return a checkpoint for the current generator.
-   */
-  public GeneratorCheckpoint toCheckpoint() {
-    return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
-  }
-
-  /**
-   * Return a deep copy of this generator.
-   */
-  public Generator copy() {
-    checkNotNull(config);
-    Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
-    return result;
-  }
-
-  /**
-   * Return the current config for this generator. Note that configs may be replaced by {@link
-   * #splitAtEventId}.
-   */
-  public GeneratorConfig getCurrentConfig() {
-    return config;
-  }
-
-  /**
-   * Mutate this generator so that it will only generate events up to but not including
-   * {@code eventId}. Return a config to represent the events this generator will no longer yield.
-   * The generators will run in on a serial timeline.
-   */
-  public GeneratorConfig splitAtEventId(long eventId) {
-    long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
-    GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
-        config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
-    config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
-    return remainConfig;
-  }
-
-  /**
-   * Return the next 'event id'. Though events don't have ids we can simulate them to
-   * help with bookkeeping.
-   */
-  public long getNextEventId() {
-    return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
-  }
-
-
-
-  /**
-   * Generate and return a random bid with next available id.
-   */
-  private Bid nextBid(long eventId, Random random, long timestamp) {
-    long auction;
-    // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
-    if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
-      // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
-      auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
-    } else {
-      auction = nextBase0AuctionId(eventId, random, config);
-    }
-    auction += GeneratorConfig.FIRST_AUCTION_ID;
-
-    long bidder;
-    // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
-    if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
-      // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
-      // last HOT_BIDDER_RATIO people.
-      bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
-    } else {
-      bidder = nextBase0PersonId(eventId, random, config);
-    }
-    bidder += GeneratorConfig.FIRST_PERSON_ID;
-
-    long price = nextPrice(random);
-    int currentSize = 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
-    return new Bid(auction, bidder, price, timestamp, extra);
-  }
-
-  @Override
-  public boolean hasNext() {
-    return eventsCountSoFar < config.maxEvents;
-  }
-
-  /**
-   * Return the next event. The outer timestamp is in wallclock time and corresponds to
-   * when the event should fire. The inner timestamp is in event-time and represents the
-   * time the event is purported to have taken place in the simulation.
-   */
-  public NextEvent nextEvent() {
-    if (wallclockBaseTime < 0) {
-      wallclockBaseTime = System.currentTimeMillis();
-    }
-    // When, in event time, we should generate the event. Monotonic.
-    long eventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(
-            config.nextEventNumber(eventsCountSoFar)).getKey();
-    // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
-    // may have local jitter.
-    long adjustedEventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(
-            config.nextAdjustedEventNumber(eventsCountSoFar))
-            .getKey();
-    // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
-    // the event timestamp.
-    long watermark =
-        config.timestampAndInterEventDelayUsForEvent(
-            config.nextEventNumberForWatermark(eventsCountSoFar))
-            .getKey();
-    // When, in wallclock time, we should emit the event.
-    long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
-
-    // Seed the random number generator with the next 'event id'.
-    Random random = new Random(getNextEventId());
-
-
-    long newEventId = getNextEventId();
-    long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-
-    Event event;
-    if (rem < GeneratorConfig.PERSON_PROPORTION) {
-      event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
-    } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
-      event = new Event(
-          nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
-    } else {
-      event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
-    }
-
-    eventsCountSoFar++;
-    return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
-  }
-
-  @Override
-  public TimestampedValue<Event> next() {
-    NextEvent next = nextEvent();
-    return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Return how many microseconds till we emit the next event.
-   */
-  public long currentInterEventDelayUs() {
-    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
-        .getValue();
-  }
-
-  /**
-   * Return an estimate of fraction of output consumed.
-   */
-  public double getFractionConsumed() {
-    return (double) eventsCountSoFar / config.maxEvents;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
-        eventsCountSoFar, wallclockBaseTime);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
deleted file mode 100644
index dfc135d..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-
-/**
- * Just enough state to be able to restore a generator back to where it was checkpointed.
- */
-public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-  /** Coder for this class. */
-  public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
-      new CustomCoder<GeneratorCheckpoint>() {
-        @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
-            throws CoderException, IOException {
-          LONG_CODER.encode(value.numEvents, outStream);
-          LONG_CODER.encode(value.wallclockBaseTime, outStream);
-        }
-
-        @Override
-        public GeneratorCheckpoint decode(InputStream inStream)
-            throws CoderException, IOException {
-          long numEvents = LONG_CODER.decode(inStream);
-          long wallclockBaseTime = LONG_CODER.decode(inStream);
-          return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
-        }
-        @Override public void verifyDeterministic() throws NonDeterministicException {}
-      };
-
-  private final long numEvents;
-  private final long wallclockBaseTime;
-
-  GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
-    this.numEvents = numEvents;
-    this.wallclockBaseTime = wallclockBaseTime;
-  }
-
-  public Generator toGenerator(GeneratorConfig config) {
-    return new Generator(config, numEvents, wallclockBaseTime);
-  }
-
-  @Override
-  public void finalizeCheckpoint() throws IOException {
-    // Nothing to finalize.
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Generator.GeneratorCheckpoint{numEvents:%d;wallclockBaseTime:%d}",
-        numEvents, wallclockBaseTime);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
deleted file mode 100644
index 8e0a899..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
- */
-public class GeneratorConfig implements Serializable {
-
-  /**
-   * We start the ids at specific values to help ensure the queries find a match even on
-   * small synthesized dataset sizes.
-   */
-  public static final long FIRST_AUCTION_ID = 1000L;
-  public static final long FIRST_PERSON_ID = 1000L;
-  public static final long FIRST_CATEGORY_ID = 10L;
-
-  /**
-   * Proportions of people/auctions/bids to synthesize.
-   */
-  public static final int PERSON_PROPORTION = 1;
-  public static final int AUCTION_PROPORTION = 3;
-  private static final int BID_PROPORTION = 46;
-  public static final int PROPORTION_DENOMINATOR =
-      PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
-
-  /**
-   * Environment options.
-   */
-  public final NexmarkConfiguration configuration;
-
-  /**
-   * Delay between events, in microseconds. If the array has more than one entry then
-   * the rate is changed every {@link #stepLengthSec}, and wraps around.
-   */
-  private final long[] interEventDelayUs;
-
-  /**
-   * Delay before changing the current inter-event delay.
-   */
-  private final long stepLengthSec;
-
-  /**
-   * Time for first event (ms since epoch).
-   */
-  public final long baseTime;
-
-  /**
-   * Event id of first event to be generated. Event ids are unique over all generators, and
-   * are used as a seed to generate each event's data.
-   */
-  public final long firstEventId;
-
-  /**
-   * Maximum number of events to generate.
-   */
-  public final long maxEvents;
-
-  /**
-   * First event number. Generators running in parallel time may share the same event number,
-   * and the event number is used to determine the event timestamp.
-   */
-  public final long firstEventNumber;
-
-  /**
-   * True period of epoch in milliseconds. Derived from above.
-   * (Ie time to run through cycle for all interEventDelayUs entries).
-   */
-  private final long epochPeriodMs;
-
-  /**
-   * Number of events per epoch. Derived from above.
-   * (Ie number of events to run through cycle for all interEventDelayUs entries).
-   */
-  private final long eventsPerEpoch;
-
-  public GeneratorConfig(
-      NexmarkConfiguration configuration, long baseTime, long firstEventId,
-      long maxEventsOrZero, long firstEventNumber) {
-    this.configuration = configuration;
-    this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
-        configuration.firstEventRate, configuration.nextEventRate,
-        configuration.rateUnit, configuration.numEventGenerators);
-    this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
-    this.baseTime = baseTime;
-    this.firstEventId = firstEventId;
-    if (maxEventsOrZero == 0) {
-      // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
-      this.maxEvents =
-          Long.MAX_VALUE / (PROPORTION_DENOMINATOR
-                            * Math.max(
-              Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
-              configuration.avgBidByteSize));
-    } else {
-      this.maxEvents = maxEventsOrZero;
-    }
-    this.firstEventNumber = firstEventNumber;
-
-    long eventsPerEpoch = 0;
-    long epochPeriodMs = 0;
-    if (interEventDelayUs.length > 1) {
-      for (long interEventDelayU : interEventDelayUs) {
-        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
-        eventsPerEpoch += numEventsForThisCycle;
-        epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
-      }
-    }
-    this.eventsPerEpoch = eventsPerEpoch;
-    this.epochPeriodMs = epochPeriodMs;
-  }
-
-  /**
-   * Return a copy of this config.
-   */
-  public GeneratorConfig copy() {
-    GeneratorConfig result;
-      result = new GeneratorConfig(configuration, baseTime, firstEventId,
-          maxEvents, firstEventNumber);
-    return result;
-  }
-
-  /**
-   * Split this config into {@code n} sub-configs with roughly equal number of
-   * possible events, but distinct value spaces. The generators will run on parallel timelines.
-   * This config should no longer be used.
-   */
-  public List<GeneratorConfig> split(int n) {
-    List<GeneratorConfig> results = new ArrayList<>();
-    if (n == 1) {
-      // No split required.
-      results.add(this);
-    } else {
-      long subMaxEvents = maxEvents / n;
-      long subFirstEventId = firstEventId;
-      for (int i = 0; i < n; i++) {
-        if (i == n - 1) {
-          // Don't loose any events to round-down.
-          subMaxEvents = maxEvents - subMaxEvents * (n - 1);
-        }
-        results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
-        subFirstEventId += subMaxEvents;
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Return copy of this config except with given parameters.
-   */
-  public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
-    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
-  }
-
-  /**
-   * Return an estimate of the bytes needed by {@code numEvents}.
-   */
-  public long estimatedBytesForEvents(long numEvents) {
-    long numPersons =
-        (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
-    long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
-    return numPersons * configuration.avgPersonByteSize
-           + numAuctions * configuration.avgAuctionByteSize
-           + numBids * configuration.avgBidByteSize;
-  }
-
-  public int getAvgPersonByteSize() {
-    return configuration.avgPersonByteSize;
-  }
-
-  public int getNumActivePeople() {
-    return configuration.numActivePeople;
-  }
-
-  public int getHotSellersRatio() {
-    return configuration.hotSellersRatio;
-  }
-
-  public int getNumInFlightAuctions() {
-    return configuration.numInFlightAuctions;
-  }
-
-  public int getHotAuctionRatio() {
-    return configuration.hotAuctionRatio;
-  }
-
-  public int getHotBiddersRatio() {
-    return configuration.hotBiddersRatio;
-  }
-
-  public int getAvgBidByteSize() {
-    return configuration.avgBidByteSize;
-  }
-
-  public int getAvgAuctionByteSize() {
-    return configuration.avgAuctionByteSize;
-  }
-
-  public double getProbDelayedEvent() {
-    return configuration.probDelayedEvent;
-  }
-
-  public long getOccasionalDelaySec() {
-    return configuration.occasionalDelaySec;
-  }
-
-  /**
-   * Return an estimate of the byte-size of all events a generator for this config would yield.
-   */
-  public long getEstimatedSizeBytes() {
-    return estimatedBytesForEvents(maxEvents);
-  }
-
-  /**
-   * Return the first 'event id' which could be generated from this config. Though events don't
-   * have ids we can simulate them to help bookkeeping.
-   */
-  public long getStartEventId() {
-    return firstEventId + firstEventNumber;
-  }
-
-  /**
-   * Return one past the last 'event id' which could be generated from this config.
-   */
-  public long getStopEventId() {
-    return firstEventId + firstEventNumber + maxEvents;
-  }
-
-  /**
-   * Return the next event number for a generator which has so far emitted {@code numEvents}.
-   */
-  public long nextEventNumber(long numEvents) {
-    return firstEventNumber + numEvents;
-  }
-
-  /**
-   * Return the next event number for a generator which has so far emitted {@code numEvents},
-   * but adjusted to account for {@code outOfOrderGroupSize}.
-   */
-  public long nextAdjustedEventNumber(long numEvents) {
-    long n = configuration.outOfOrderGroupSize;
-    long eventNumber = nextEventNumber(numEvents);
-    long base = (eventNumber / n) * n;
-    long offset = (eventNumber * 953) % n;
-    return base + offset;
-  }
-
-  /**
-   * Return the event number who's event time will be a suitable watermark for
-   * a generator which has so far emitted {@code numEvents}.
-   */
-  public long nextEventNumberForWatermark(long numEvents) {
-    long n = configuration.outOfOrderGroupSize;
-    long eventNumber = nextEventNumber(numEvents);
-    return (eventNumber / n) * n;
-  }
-
-  /**
-   * What timestamp should the event with {@code eventNumber} have for this generator? And
-   * what inter-event delay (in microseconds) is current?
-   */
-  public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
-    if (interEventDelayUs.length == 1) {
-      long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
-      return KV.of(timestamp, interEventDelayUs[0]);
-    }
-
-    long epoch = eventNumber / eventsPerEpoch;
-    long n = eventNumber % eventsPerEpoch;
-    long offsetInEpochMs = 0;
-    for (long interEventDelayU : interEventDelayUs) {
-      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
-      if (n < numEventsForThisCycle) {
-        long offsetInCycleUs = n * interEventDelayU;
-        long timestamp =
-            baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
-        return KV.of(timestamp, interEventDelayU);
-      }
-      n -= numEventsForThisCycle;
-      offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
-    }
-    throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("GeneratorConfig");
-    sb.append("{configuration:");
-    sb.append(configuration.toString());
-    sb.append(";interEventDelayUs=[");
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      if (i > 0) {
-        sb.append(",");
-      }
-      sb.append(interEventDelayUs[i]);
-    }
-    sb.append("]");
-    sb.append(";stepLengthSec:");
-    sb.append(stepLengthSec);
-    sb.append(";baseTime:");
-    sb.append(baseTime);
-    sb.append(";firstEventId:");
-    sb.append(firstEventId);
-    sb.append(";maxEvents:");
-    sb.append(maxEvents);
-    sb.append(";firstEventNumber:");
-    sb.append(firstEventNumber);
-    sb.append(";epochPeriodMs:");
-    sb.append(epochPeriodMs);
-    sb.append(";eventsPerEpoch:");
-    sb.append(eventsPerEpoch);
-    sb.append("}");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index 74eb061..f43486d 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -30,6 +30,9 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TimestampedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
new file mode 100644
index 0000000..bd736c1
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.nextAuction;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.BidGenerator.nextBid;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextPerson;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
+ * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
+ * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
+ *
+ * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
+ * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
+ * (in microseconds). The event stream is thus fully deterministic and does not depend on
+ * wallclock time.
+ *
+ * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
+ * so that we can resume generating events from a saved snapshot.
+ */
+public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
+
+
+  /**
+   * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
+   * (arbitrary but stable) event hash order.
+   */
+  public static class NextEvent implements Comparable<NextEvent> {
+    /** When, in wallclock time, should this event be emitted? */
+    public final long wallclockTimestamp;
+
+    /** When, in event time, should this event be considered to have occured? */
+    public final long eventTimestamp;
+
+    /** The event itself. */
+    public final Event event;
+
+    /** The minimum of this and all future event timestamps. */
+    public final long watermark;
+
+    public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
+      this.wallclockTimestamp = wallclockTimestamp;
+      this.eventTimestamp = eventTimestamp;
+      this.event = event;
+      this.watermark = watermark;
+    }
+
+    /**
+     * Return a deep copy of next event with delay added to wallclock timestamp and
+     * event annotate as 'LATE'.
+     */
+    public NextEvent withDelay(long delayMs) {
+      return new NextEvent(
+          wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      NextEvent nextEvent = (NextEvent) o;
+
+      return (wallclockTimestamp == nextEvent.wallclockTimestamp
+          && eventTimestamp == nextEvent.eventTimestamp
+          && watermark == nextEvent.watermark
+          && event.equals(nextEvent.event));
+    }
+
+    @Override public int hashCode() {
+      return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
+    }
+
+    @Override
+    public int compareTo(NextEvent other) {
+      int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
+      if (i != 0) {
+        return i;
+      }
+      return Integer.compare(event.hashCode(), other.event.hashCode());
+    }
+  }
+
+  /**
+   * Configuration to generate events against. Note that it may be replaced by a call to
+   * {@link #splitAtEventId}.
+   */
+  private GeneratorConfig config;
+
+  /** Number of events generated by this generator. */
+  private long eventsCountSoFar;
+
+  /**
+   * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
+   */
+  private long wallclockBaseTime;
+
+  Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
+    checkNotNull(config);
+    this.config = config;
+    this.eventsCountSoFar = eventsCountSoFar;
+    this.wallclockBaseTime = wallclockBaseTime;
+  }
+
+  /**
+   * Create a fresh generator according to {@code config}.
+   */
+  public Generator(GeneratorConfig config) {
+    this(config, 0, -1);
+  }
+
+  /**
+   * Return a checkpoint for the current generator.
+   */
+  public GeneratorCheckpoint toCheckpoint() {
+    return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
+  }
+
+  /**
+   * Return a deep copy of this generator.
+   */
+  public Generator copy() {
+    checkNotNull(config);
+    Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
+    return result;
+  }
+
+  /**
+   * Return the current config for this generator. Note that configs may be replaced by {@link
+   * #splitAtEventId}.
+   */
+  public GeneratorConfig getCurrentConfig() {
+    return config;
+  }
+
+  /**
+   * Mutate this generator so that it will only generate events up to but not including
+   * {@code eventId}. Return a config to represent the events this generator will no longer yield.
+   * The generators will run in on a serial timeline.
+   */
+  public GeneratorConfig splitAtEventId(long eventId) {
+    long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
+    GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
+        config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
+    config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+    return remainConfig;
+  }
+
+  /**
+   * Return the next 'event id'. Though events don't have ids we can simulate them to
+   * help with bookkeeping.
+   */
+  public long getNextEventId() {
+    return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return eventsCountSoFar < config.maxEvents;
+  }
+
+  /**
+   * Return the next event. The outer timestamp is in wallclock time and corresponds to
+   * when the event should fire. The inner timestamp is in event-time and represents the
+   * time the event is purported to have taken place in the simulation.
+   */
+  public NextEvent nextEvent() {
+    if (wallclockBaseTime < 0) {
+      wallclockBaseTime = System.currentTimeMillis();
+    }
+    // When, in event time, we should generate the event. Monotonic.
+    long eventTimestamp =
+        config.timestampAndInterEventDelayUsForEvent(
+            config.nextEventNumber(eventsCountSoFar)).getKey();
+    // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
+    // may have local jitter.
+    long adjustedEventTimestamp =
+        config.timestampAndInterEventDelayUsForEvent(
+            config.nextAdjustedEventNumber(eventsCountSoFar))
+            .getKey();
+    // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
+    // the event timestamp.
+    long watermark =
+        config.timestampAndInterEventDelayUsForEvent(
+            config.nextEventNumberForWatermark(eventsCountSoFar))
+            .getKey();
+    // When, in wallclock time, we should emit the event.
+    long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
+
+    // Seed the random number generator with the next 'event id'.
+    Random random = new Random(getNextEventId());
+
+
+    long newEventId = getNextEventId();
+    long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+    Event event;
+    if (rem < GeneratorConfig.PERSON_PROPORTION) {
+      event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
+    } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+      event = new Event(
+          nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
+    } else {
+      event = new Event(nextBid(newEventId, random, adjustedEventTimestamp, config));
+    }
+
+    eventsCountSoFar++;
+    return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
+  }
+
+  @Override
+  public TimestampedValue<Event> next() {
+    NextEvent next = nextEvent();
+    return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Return how many microseconds till we emit the next event.
+   */
+  public long currentInterEventDelayUs() {
+    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
+        .getValue();
+  }
+
+  /**
+   * Return an estimate of fraction of output consumed.
+   */
+  public double getFractionConsumed() {
+    return (double) eventsCountSoFar / config.maxEvents;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
+        eventsCountSoFar, wallclockBaseTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
new file mode 100644
index 0000000..fa41739
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  /** Coder for this class. */
+  public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
+      new CustomCoder<GeneratorCheckpoint>() {
+        @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
+            throws CoderException, IOException {
+          LONG_CODER.encode(value.numEvents, outStream);
+          LONG_CODER.encode(value.wallclockBaseTime, outStream);
+        }
+
+        @Override
+        public GeneratorCheckpoint decode(InputStream inStream)
+            throws CoderException, IOException {
+          long numEvents = LONG_CODER.decode(inStream);
+          long wallclockBaseTime = LONG_CODER.decode(inStream);
+          return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
+        }
+        @Override public void verifyDeterministic() throws NonDeterministicException {}
+      };
+
+  private final long numEvents;
+  private final long wallclockBaseTime;
+
+  GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
+    this.numEvents = numEvents;
+    this.wallclockBaseTime = wallclockBaseTime;
+  }
+
+  public Generator toGenerator(GeneratorConfig config) {
+    return new Generator(config, numEvents, wallclockBaseTime);
+  }
+
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+    // Nothing to finalize.
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("numEvents", numEvents)
+        .add("wallclockBaseTime", wallclockBaseTime)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
new file mode 100644
index 0000000..7c862fa
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
+ */
+public class GeneratorConfig implements Serializable {
+
+  /**
+   * We start the ids at specific values to help ensure the queries find a match even on
+   * small synthesized dataset sizes.
+   */
+  public static final long FIRST_AUCTION_ID = 1000L;
+  public static final long FIRST_PERSON_ID = 1000L;
+  public static final long FIRST_CATEGORY_ID = 10L;
+
+  /**
+   * Proportions of people/auctions/bids to synthesize.
+   */
+  public static final int PERSON_PROPORTION = 1;
+  public static final int AUCTION_PROPORTION = 3;
+  private static final int BID_PROPORTION = 46;
+  public static final int PROPORTION_DENOMINATOR =
+      PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
+
+  /**
+   * Environment options.
+   */
+  private final NexmarkConfiguration configuration;
+
+  /**
+   * Delay between events, in microseconds. If the array has more than one entry then
+   * the rate is changed every {@link #stepLengthSec}, and wraps around.
+   */
+  private final long[] interEventDelayUs;
+
+  /**
+   * Delay before changing the current inter-event delay.
+   */
+  private final long stepLengthSec;
+
+  /**
+   * Time for first event (ms since epoch).
+   */
+  public final long baseTime;
+
+  /**
+   * Event id of first event to be generated. Event ids are unique over all generators, and
+   * are used as a seed to generate each event's data.
+   */
+  public final long firstEventId;
+
+  /**
+   * Maximum number of events to generate.
+   */
+  public final long maxEvents;
+
+  /**
+   * First event number. Generators running in parallel time may share the same event number,
+   * and the event number is used to determine the event timestamp.
+   */
+  public final long firstEventNumber;
+
+  /**
+   * True period of epoch in milliseconds. Derived from above.
+   * (Ie time to run through cycle for all interEventDelayUs entries).
+   */
+  private final long epochPeriodMs;
+
+  /**
+   * Number of events per epoch. Derived from above.
+   * (Ie number of events to run through cycle for all interEventDelayUs entries).
+   */
+  private final long eventsPerEpoch;
+
+  public GeneratorConfig(
+      NexmarkConfiguration configuration, long baseTime, long firstEventId,
+      long maxEventsOrZero, long firstEventNumber) {
+    this.configuration = configuration;
+    this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
+        configuration.firstEventRate, configuration.nextEventRate,
+        configuration.rateUnit, configuration.numEventGenerators);
+    this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
+    this.baseTime = baseTime;
+    this.firstEventId = firstEventId;
+    if (maxEventsOrZero == 0) {
+      // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
+      this.maxEvents =
+          Long.MAX_VALUE / (PROPORTION_DENOMINATOR
+                            * Math.max(
+              Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
+              configuration.avgBidByteSize));
+    } else {
+      this.maxEvents = maxEventsOrZero;
+    }
+    this.firstEventNumber = firstEventNumber;
+
+    long eventsPerEpoch = 0;
+    long epochPeriodMs = 0;
+    if (interEventDelayUs.length > 1) {
+      for (long interEventDelayU : interEventDelayUs) {
+        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+        eventsPerEpoch += numEventsForThisCycle;
+        epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+      }
+    }
+    this.eventsPerEpoch = eventsPerEpoch;
+    this.epochPeriodMs = epochPeriodMs;
+  }
+
+  /**
+   * Return a copy of this config.
+   */
+  public GeneratorConfig copy() {
+    GeneratorConfig result;
+      result = new GeneratorConfig(configuration, baseTime, firstEventId,
+          maxEvents, firstEventNumber);
+    return result;
+  }
+
+  /**
+   * Split this config into {@code n} sub-configs with roughly equal number of
+   * possible events, but distinct value spaces. The generators will run on parallel timelines.
+   * This config should no longer be used.
+   */
+  public List<GeneratorConfig> split(int n) {
+    List<GeneratorConfig> results = new ArrayList<>();
+    if (n == 1) {
+      // No split required.
+      results.add(this);
+    } else {
+      long subMaxEvents = maxEvents / n;
+      long subFirstEventId = firstEventId;
+      for (int i = 0; i < n; i++) {
+        if (i == n - 1) {
+          // Don't loose any events to round-down.
+          subMaxEvents = maxEvents - subMaxEvents * (n - 1);
+        }
+        results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
+        subFirstEventId += subMaxEvents;
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Return copy of this config except with given parameters.
+   */
+  public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
+    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+  }
+
+  /**
+   * Return an estimate of the bytes needed by {@code numEvents}.
+   */
+  public long estimatedBytesForEvents(long numEvents) {
+    long numPersons =
+        (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
+    long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
+    return numPersons * configuration.avgPersonByteSize
+           + numAuctions * configuration.avgAuctionByteSize
+           + numBids * configuration.avgBidByteSize;
+  }
+
+  public int getAvgPersonByteSize() {
+    return configuration.avgPersonByteSize;
+  }
+
+  public int getNumActivePeople() {
+    return configuration.numActivePeople;
+  }
+
+  public int getHotSellersRatio() {
+    return configuration.hotSellersRatio;
+  }
+
+  public int getNumInFlightAuctions() {
+    return configuration.numInFlightAuctions;
+  }
+
+  public int getHotAuctionRatio() {
+    return configuration.hotAuctionRatio;
+  }
+
+  public int getHotBiddersRatio() {
+    return configuration.hotBiddersRatio;
+  }
+
+  public int getAvgBidByteSize() {
+    return configuration.avgBidByteSize;
+  }
+
+  public int getAvgAuctionByteSize() {
+    return configuration.avgAuctionByteSize;
+  }
+
+  public double getProbDelayedEvent() {
+    return configuration.probDelayedEvent;
+  }
+
+  public long getOccasionalDelaySec() {
+    return configuration.occasionalDelaySec;
+  }
+
+  /**
+   * Return an estimate of the byte-size of all events a generator for this config would yield.
+   */
+  public long getEstimatedSizeBytes() {
+    return estimatedBytesForEvents(maxEvents);
+  }
+
+  /**
+   * Return the first 'event id' which could be generated from this config. Though events don't
+   * have ids we can simulate them to help bookkeeping.
+   */
+  public long getStartEventId() {
+    return firstEventId + firstEventNumber;
+  }
+
+  /**
+   * Return one past the last 'event id' which could be generated from this config.
+   */
+  public long getStopEventId() {
+    return firstEventId + firstEventNumber + maxEvents;
+  }
+
+  /**
+   * Return the next event number for a generator which has so far emitted {@code numEvents}.
+   */
+  public long nextEventNumber(long numEvents) {
+    return firstEventNumber + numEvents;
+  }
+
+  /**
+   * Return the next event number for a generator which has so far emitted {@code numEvents},
+   * but adjusted to account for {@code outOfOrderGroupSize}.
+   */
+  public long nextAdjustedEventNumber(long numEvents) {
+    long n = configuration.outOfOrderGroupSize;
+    long eventNumber = nextEventNumber(numEvents);
+    long base = (eventNumber / n) * n;
+    long offset = (eventNumber * 953) % n;
+    return base + offset;
+  }
+
+  /**
+   * Return the event number who's event time will be a suitable watermark for
+   * a generator which has so far emitted {@code numEvents}.
+   */
+  public long nextEventNumberForWatermark(long numEvents) {
+    long n = configuration.outOfOrderGroupSize;
+    long eventNumber = nextEventNumber(numEvents);
+    return (eventNumber / n) * n;
+  }
+
+  /**
+   * What timestamp should the event with {@code eventNumber} have for this generator? And
+   * what inter-event delay (in microseconds) is current?
+   */
+  public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
+    if (interEventDelayUs.length == 1) {
+      long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
+      return KV.of(timestamp, interEventDelayUs[0]);
+    }
+
+    long epoch = eventNumber / eventsPerEpoch;
+    long n = eventNumber % eventsPerEpoch;
+    long offsetInEpochMs = 0;
+    for (long interEventDelayU : interEventDelayUs) {
+      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+      if (n < numEventsForThisCycle) {
+        long offsetInCycleUs = n * interEventDelayU;
+        long timestamp =
+            baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
+        return KV.of(timestamp, interEventDelayU);
+      }
+      n -= numEventsForThisCycle;
+      offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+    }
+    throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("GeneratorConfig");
+    sb.append("{configuration:");
+    sb.append(configuration.toString());
+    sb.append(";interEventDelayUs=[");
+    for (int i = 0; i < interEventDelayUs.length; i++) {
+      if (i > 0) {
+        sb.append(",");
+      }
+      sb.append(interEventDelayUs[i]);
+    }
+    sb.append("]");
+    sb.append(";stepLengthSec:");
+    sb.append(stepLengthSec);
+    sb.append(";baseTime:");
+    sb.append(baseTime);
+    sb.append(";firstEventId:");
+    sb.append(firstEventId);
+    sb.append(";maxEvents:");
+    sb.append(maxEvents);
+    sb.append(";firstEventNumber:");
+    sb.append(firstEventNumber);
+    sb.append(";epochPeriodMs:");
+    sb.append(epochPeriodMs);
+    sb.append(";eventsPerEpoch:");
+    sb.append(eventsPerEpoch);
+    sb.append("}");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
new file mode 100644
index 0000000..41a81da
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PriceGenerator.nextPrice;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextString;
+
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * AuctionGenerator.
+ */
+public class AuctionGenerator {
+  /**
+   * Keep the number of categories small so the example queries will find results even with
+   * a small batch of events.
+   */
+  private static final int NUM_CATEGORIES = 5;
+
+  /**
+   * Number of yet-to-be-created people and auction ids allowed.
+   */
+  private static final int AUCTION_ID_LEAD = 10;
+
+  /**
+   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+   * over these values.
+   */
+  private static final int HOT_SELLER_RATIO = 100;
+
+  /**
+   * Generate and return a random auction with next available id.
+   */
+  public static Auction nextAuction(
+      long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+    long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
+
+    long seller;
+    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+    if (random.nextInt(config.getHotSellersRatio()) > 0) {
+      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+      seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+    } else {
+      seller = nextBase0PersonId(eventId, random, config);
+    }
+    seller += GeneratorConfig.FIRST_PERSON_ID;
+
+    long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+    long initialBid = nextPrice(random);
+    long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config);
+    String name = nextString(random, 20);
+    String desc = nextString(random, 100);
+    long reserve = initialBid + nextPrice(random);
+    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+    String extra = nextExtra(random, currentSize, config.getAvgAuctionByteSize());
+    return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
+        extra);
+  }
+
+  /**
+   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+   * due to generate an auction.
+   */
+  public static long lastBase0AuctionId(long eventId) {
+    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+    if (offset < GeneratorConfig.PERSON_PROPORTION) {
+      // About to generate a person.
+      // Go back to the last auction in the last epoch.
+      epoch--;
+      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+      // About to generate a bid.
+      // Go back to the last auction generated in this epoch.
+      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+    } else {
+      // About to generate an auction.
+      offset -= GeneratorConfig.PERSON_PROPORTION;
+    }
+    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+  }
+
+  /**
+   * Return a random auction id (base 0).
+   */
+  public static long nextBase0AuctionId(
+      long nextEventId, Random random, GeneratorConfig config) {
+
+    // Choose a random auction for any of those which are likely to still be in flight,
+    // plus a few 'leads'.
+    // Note that ideally we'd track non-expired auctions exactly, but that state
+    // is difficult to split.
+    long minAuction = Math.max(
+        lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
+    long maxAuction = lastBase0AuctionId(nextEventId);
+    return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+  }
+
+  /** Return a random time delay, in milliseconds, for length of auctions. */
+  private static long nextAuctionLengthMs(
+      long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) {
+
+    // What's our current event number?
+    long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
+    // How many events till we've generated numInFlightAuctions?
+    long numEventsForAuctions =
+        (config.getNumInFlightAuctions() * GeneratorConfig.PROPORTION_DENOMINATOR)
+            / GeneratorConfig.AUCTION_PROPORTION;
+    // When will the auction numInFlightAuctions beyond now be generated?
+    long futureAuction = config
+        .timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+        .getKey();
+    // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+    //     futureAuction - timestamp, numEventsForAuctions);
+    // Choose a length with average horizonMs.
+    long horizonMs = futureAuction - timestamp;
+    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
new file mode 100644
index 0000000..cffe380
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.lastBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.nextBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * Generates bids.
+ */
+public class BidGenerator {
+
+  /**
+   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+   * over these values.
+   */
+  private static final int HOT_AUCTION_RATIO = 100;
+  private static final int HOT_BIDDER_RATIO = 100;
+
+
+  /**
+   * Generate and return a random bid with next available id.
+   */
+  public static Bid nextBid(
+      long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+    long auction;
+    // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
+    if (random.nextInt(config.getHotAuctionRatio()) > 0) {
+      // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
+      auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+    } else {
+      auction = nextBase0AuctionId(eventId, random, config);
+    }
+    auction += GeneratorConfig.FIRST_AUCTION_ID;
+
+    long bidder;
+    // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
+    if (random.nextInt(config.getHotBiddersRatio()) > 0) {
+      // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
+      // last HOT_BIDDER_RATIO people.
+      bidder = (lastBase0PersonId(eventId) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+    } else {
+      bidder = nextBase0PersonId(eventId, random, config);
+    }
+    bidder += GeneratorConfig.FIRST_PERSON_ID;
+
+    long price = PriceGenerator.nextPrice(random);
+    int currentSize = 8 + 8 + 8 + 8;
+    String extra = nextExtra(random, currentSize, config.getAvgBidByteSize());
+    return new Bid(auction, bidder, price, timestamp, extra);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
new file mode 100644
index 0000000..ed9db84
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * LongGenerator.
+ */
+public class LongGenerator {
+
+  /** Return a random long from {@code [0, n)}. */
+  public static long nextLong(Random random, long n) {
+    if (n < Integer.MAX_VALUE) {
+      return random.nextInt((int) n);
+    } else {
+      // WARNING: Very skewed distribution! Bad!
+      return Math.abs(random.nextLong() % n);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
new file mode 100644
index 0000000..9f306ea
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextString;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * Generates people.
+ */
+public class PersonGenerator {
+  /**
+   * Number of yet-to-be-created people and auction ids allowed.
+   */
+  private static final int PERSON_ID_LEAD = 10;
+
+  /**
+   * Keep the number of states small so that the example queries will find results even with
+   * a small batch of events.
+   */
+  private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+  private static final List<String> US_CITIES =
+      Arrays.asList(
+          ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+              .split(","));
+
+  private static final List<String> FIRST_NAMES =
+      Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+  private static final List<String> LAST_NAMES =
+      Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+
+  /**
+   * Generate and return a random person with next available id.
+   */
+  public static Person nextPerson(
+      long nextEventId, Random random, long timestamp, GeneratorConfig config) {
+
+    long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
+    String name = nextPersonName(random);
+    String email = nextEmail(random);
+    String creditCard = nextCreditCard(random);
+    String city = nextUSCity(random);
+    String state = nextUSState(random);
+    int currentSize =
+        8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+    String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize());
+    return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+  }
+
+  /**
+   * Return a random person id (base 0).
+   */
+  public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) {
+    // Choose a random person from any of the 'active' people, plus a few 'leads'.
+    // By limiting to 'active' we ensure the density of bids or auctions per person
+    // does not decrease over time for long running jobs.
+    // By choosing a person id ahead of the last valid person id we will make
+    // newPerson and newAuction events appear to have been swapped in time.
+    long numPeople = lastBase0PersonId(eventId) + 1;
+    long activePeople = Math.min(numPeople, config.getNumActivePeople());
+    long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+    return numPeople - activePeople + n;
+  }
+
+  /**
+   * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+   * due to generate a person.
+   */
+  public static long lastBase0PersonId(long eventId) {
+    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+    if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+      // About to generate an auction or bid.
+      // Go back to the last person generated in this epoch.
+      offset = GeneratorConfig.PERSON_PROPORTION - 1;
+    }
+    // About to generate a person.
+    return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+  }
+
+
+  /** return a random US state. */
+  private static String nextUSState(Random random) {
+    return US_STATES.get(random.nextInt(US_STATES.size()));
+  }
+
+  /** Return a random US city. */
+  private static String nextUSCity(Random random) {
+    return US_CITIES.get(random.nextInt(US_CITIES.size()));
+  }
+
+  /** Return a random person name. */
+  private static String nextPersonName(Random random) {
+    return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+        + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+  }
+
+  /** Return a random email address. */
+  private static String nextEmail(Random random) {
+    return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+  }
+
+  /** Return a random credit card number. */
+  private static String nextCreditCard(Random random) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 4; i++) {
+      if (i > 0) {
+        sb.append(' ');
+      }
+      sb.append(String.format("%04d", random.nextInt(10000)));
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
new file mode 100644
index 0000000..912b16e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * Generates a random price.
+ */
+public class PriceGenerator {
+
+  /** Return a random price. */
+  public static long nextPrice(Random random) {
+    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
new file mode 100644
index 0000000..c808560
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * Generates strings which are used for different field in other model objects.
+ */
+public class StringsGenerator {
+
+  /** Smallest random string size. */
+  private static final int MIN_STRING_LENGTH = 3;
+
+  /** Return a random string of up to {@code maxLength}. */
+  public static String nextString(Random random, int maxLength) {
+    int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+    StringBuilder sb = new StringBuilder();
+    while (len-- > 0) {
+      if (random.nextInt(13) == 0) {
+        sb.append(' ');
+      } else {
+        sb.append((char) ('a' + random.nextInt(26)));
+      }
+    }
+    return sb.toString().trim();
+  }
+
+  /** Return a random string of exactly {@code length}. */
+  public static String nextExactString(Random random, int length) {
+    StringBuilder sb = new StringBuilder();
+    while (length-- > 0) {
+      sb.append((char) ('a' + random.nextInt(26)));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+   * {@code averageSize}.
+   */
+  public static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+    if (currentSize > desiredAverageSize) {
+      return "";
+    }
+    desiredAverageSize -= currentSize;
+    int delta = (int) Math.round(desiredAverageSize * 0.2);
+    int minSize = desiredAverageSize - delta;
+    int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+    return nextExactString(random, desiredSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
new file mode 100644
index 0000000..c15b5ed
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Model Generators.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator.model;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
new file mode 100644
index 0000000..a7ffd25
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * org.apache.beam.sdk.nexmark.sources.generator.
+ */
+
+/**
+ * Events generation logic.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;