You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:55 UTC

[47/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
deleted file mode 100644
index 9624a9d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A simulator of the {@code WinningBids} query.
- */
-public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
-  /** Auctions currently still open, indexed by auction id. */
-  private final Map<Long, Auction> openAuctions;
-
-  /** The ids of auctions known to be closed. */
-  private final Set<Long> closedAuctions;
-
-  /** Current best valid bids for open auctions, indexed by auction id. */
-  private final Map<Long, Bid> bestBids;
-
-  /** Bids for auctions we havn't seen yet. */
-  private final List<Bid> bidsWithoutAuctions;
-
-  /**
-   * Timestamp of last new auction or bid event (ms since epoch).
-   */
-  private long lastTimestamp;
-
-  public WinningBidsSimulator(NexmarkConfiguration configuration) {
-    super(NexmarkUtils.standardEventIterator(configuration));
-    openAuctions = new TreeMap<>();
-    closedAuctions = new TreeSet<>();
-    bestBids = new TreeMap<>();
-    bidsWithoutAuctions = new ArrayList<>();
-    lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
-  }
-
-  /**
-   * Try to account for {@code bid} in state. Return true if bid has now been
-   * accounted for by {@code bestBids}.
-   */
-  private boolean captureBestBid(Bid bid, boolean shouldLog) {
-    if (closedAuctions.contains(bid.auction)) {
-      // Ignore bids for known, closed auctions.
-      if (shouldLog) {
-        NexmarkUtils.info("closed auction: %s", bid);
-      }
-      return true;
-    }
-    Auction auction = openAuctions.get(bid.auction);
-    if (auction == null) {
-      // We don't have an auction for this bid yet, so can't determine if it is
-      // winning or not.
-      if (shouldLog) {
-        NexmarkUtils.info("pending auction: %s", bid);
-      }
-      return false;
-    }
-    if (bid.price < auction.reserve) {
-      // Bid price is too low.
-      if (shouldLog) {
-        NexmarkUtils.info("below reserve: %s", bid);
-      }
-      return true;
-    }
-    Bid existingBid = bestBids.get(bid.auction);
-    if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
-      // We've found a (new) best bid for a known auction.
-      bestBids.put(bid.auction, bid);
-      if (shouldLog) {
-        NexmarkUtils.info("new winning bid: %s", bid);
-      }
-    } else {
-      if (shouldLog) {
-        NexmarkUtils.info("ignoring low bid: %s", bid);
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Try to match bids without auctions to auctions.
-   */
-  private void flushBidsWithoutAuctions() {
-    Iterator<Bid> itr = bidsWithoutAuctions.iterator();
-    while (itr.hasNext()) {
-      Bid bid = itr.next();
-      if (captureBestBid(bid, false)) {
-        NexmarkUtils.info("bid now accounted for: %s", bid);
-        itr.remove();
-      }
-    }
-  }
-
-  /**
-   * Return the next winning bid for an expired auction relative to {@code timestamp}.
-   * Return null if no more winning bids, in which case all expired auctions will
-   * have been removed from our state. Retire auctions in order of expire time.
-   */
-  @Nullable
-  private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
-    Map<Long, List<Long>> toBeRetired = new TreeMap<>();
-    for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
-      if (entry.getValue().expires <= timestamp) {
-        List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
-        if (idsAtTime == null) {
-          idsAtTime = new ArrayList<>();
-          toBeRetired.put(entry.getValue().expires, idsAtTime);
-        }
-        idsAtTime.add(entry.getKey());
-      }
-    }
-    for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
-      for (long id : entry.getValue()) {
-        Auction auction = openAuctions.get(id);
-        NexmarkUtils.info("retiring auction: %s", auction);
-        openAuctions.remove(id);
-        Bid bestBid = bestBids.get(id);
-        if (bestBid != null) {
-          TimestampedValue<AuctionBid> result =
-              TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires));
-          NexmarkUtils.info("winning: %s", result);
-          return result;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  protected void run() {
-    if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
-      // We may have finally seen the auction a bid was intended for.
-      flushBidsWithoutAuctions();
-      TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
-      if (result != null) {
-        addResult(result);
-        return;
-      }
-    }
-
-    TimestampedValue<Event> timestampedEvent = nextInput();
-    if (timestampedEvent == null) {
-      // No more events. Flush any still open auctions.
-      TimestampedValue<AuctionBid> result =
-          nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
-      if (result == null) {
-        // We are done.
-        allDone();
-        return;
-      }
-      addResult(result);
-      return;
-    }
-
-    Event event = timestampedEvent.getValue();
-    if (event.newPerson != null) {
-      // Ignore new person events.
-      return;
-    }
-
-    lastTimestamp = timestampedEvent.getTimestamp().getMillis();
-    if (event.newAuction != null) {
-      // Add this new open auction to our state.
-      openAuctions.put(event.newAuction.id, event.newAuction);
-    } else {
-      if (!captureBestBid(event.bid, true)) {
-        // We don't know what to do with this bid yet.
-        NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
-        bidsWithoutAuctions.add(event.bid);
-      }
-    }
-    // Keep looking for winning bids.
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java
deleted file mode 100644
index 7a56733..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Nexmark Queries.
- */
-package org.apache.beam.integration.nexmark.queries;

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
deleted file mode 100644
index 43d6690..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A custom, bounded source of event records.
- */
-public class BoundedEventSource extends BoundedSource<Event> {
-  /** Configuration we generate events against. */
-  private final GeneratorConfig config;
-
-  /** How many bounded sources to create. */
-  private final int numEventGenerators;
-
-  public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
-    this.config = config;
-    this.numEventGenerators = numEventGenerators;
-  }
-
-  /** A reader to pull events from the generator. */
-  private static class EventReader extends BoundedReader<Event> {
-    /**
-     * Event source we purporting to be reading from.
-     * (We can't use Java's capture-outer-class pointer since we must update
-     * this field on calls to splitAtFraction.)
-     */
-    private BoundedEventSource source;
-
-    /** Generator we are reading from. */
-    private final Generator generator;
-
-    private boolean reportedStop;
-
-    @Nullable
-    private TimestampedValue<Event> currentEvent;
-
-    public EventReader(BoundedEventSource source, GeneratorConfig config) {
-      this.source = source;
-      generator = new Generator(config);
-      reportedStop = false;
-    }
-
-    @Override
-    public synchronized boolean start() {
-      NexmarkUtils.info("starting bounded generator %s", generator);
-      return advance();
-    }
-
-    @Override
-    public synchronized boolean advance() {
-      if (!generator.hasNext()) {
-        // No more events.
-        if (!reportedStop) {
-          reportedStop = true;
-          NexmarkUtils.info("stopped bounded generator %s", generator);
-        }
-        return false;
-      }
-      currentEvent = generator.next();
-      return true;
-    }
-
-    @Override
-    public synchronized Event getCurrent() throws NoSuchElementException {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getValue();
-    }
-
-    @Override
-    public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getTimestamp();
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Nothing to close.
-    }
-
-    @Override
-    public synchronized Double getFractionConsumed() {
-      return generator.getFractionConsumed();
-    }
-
-    @Override
-    public synchronized BoundedSource<Event> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    @Nullable
-    public synchronized BoundedEventSource splitAtFraction(double fraction) {
-      long startId = generator.getCurrentConfig().getStartEventId();
-      long stopId = generator.getCurrentConfig().getStopEventId();
-      long size = stopId - startId;
-      long splitEventId = startId + Math.min((int) (size * fraction), size);
-      if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) {
-        // Already passed this position or split results in left or right being empty.
-        NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction);
-        return null;
-      }
-
-      NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId);
-
-      // Scale back the event space of the current generator, and return a generator config
-      // representing the event space we just 'stole' from the current generator.
-      GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId);
-
-      NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig);
-
-      // At this point
-      //   generator.events() ++ new Generator(remainingConfig).events()
-      //   == originalGenerator.events()
-
-      // We need a new source to represent the now smaller key space for this reader, so
-      // that we can maintain the invariant that
-      //   this.getCurrentSource().createReader(...)
-      // will yield the same output as this.
-      source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators);
-
-      // Return a source from which we may read the 'stolen' event space.
-      return new BoundedEventSource(remainingConfig, source.numEventGenerators);
-    }
-  }
-
-  @Override
-  public List<BoundedEventSource> split(
-      long desiredBundleSizeBytes, PipelineOptions options) {
-    NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
-    List<BoundedEventSource> results = new ArrayList<>();
-    // Ignore desiredBundleSizeBytes and use numEventGenerators instead.
-    for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
-      results.add(new BoundedEventSource(subConfig, 1));
-    }
-    return results;
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    return config.getEstimatedSizeBytes();
-  }
-
-  @Override
-  public EventReader createReader(PipelineOptions options) {
-    NexmarkUtils.info("creating initial bounded reader for %s", config);
-    return new EventReader(this, config);
-  }
-
-  @Override
-  public void validate() {
-    // Nothing to validate.
-  }
-
-  @Override
-  public Coder<Event> getDefaultOutputCoder() {
-    return Event.CODER;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
deleted file mode 100644
index f6deceb..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
- * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
- * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
- *
- * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
- * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
- * (in microseconds). The event stream is thus fully deterministic and does not depend on
- * wallclock time.
- *
- * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
- * so that we can resume generating events from a saved snapshot.
- */
-public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
-  /**
-   * Keep the number of categories small so the example queries will find results even with
-   * a small batch of events.
-   */
-  private static final int NUM_CATEGORIES = 5;
-
-  /** Smallest random string size. */
-  private static final int MIN_STRING_LENGTH = 3;
-
-  /**
-   * Keep the number of states small so that the example queries will find results even with
-   * a small batch of events.
-   */
-  private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
-
-  private static final List<String> US_CITIES =
-      Arrays.asList(
-          ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
-              .split(","));
-
-  private static final List<String> FIRST_NAMES =
-      Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
-
-  private static final List<String> LAST_NAMES =
-      Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
-
-  /**
-   * Number of yet-to-be-created people and auction ids allowed.
-   */
-  private static final int PERSON_ID_LEAD = 10;
-  private static final int AUCTION_ID_LEAD = 10;
-
-  /**
-   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
-   * over these values.
-   */
-  private static final int HOT_AUCTION_RATIO = 100;
-  private static final int HOT_SELLER_RATIO = 100;
-  private static final int HOT_BIDDER_RATIO = 100;
-
-  /**
-   * Just enough state to be able to restore a generator back to where it was checkpointed.
-   */
-  public static class Checkpoint implements UnboundedSource.CheckpointMark {
-    private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-    /** Coder for this class. */
-    public static final Coder<Checkpoint> CODER_INSTANCE =
-        new CustomCoder<Checkpoint>() {
-          @Override public void encode(Checkpoint value, OutputStream outStream)
-          throws CoderException, IOException {
-            LONG_CODER.encode(value.numEvents, outStream);
-            LONG_CODER.encode(value.wallclockBaseTime, outStream);
-          }
-
-          @Override
-          public Checkpoint decode(InputStream inStream)
-              throws CoderException, IOException {
-            long numEvents = LONG_CODER.decode(inStream);
-            long wallclockBaseTime = LONG_CODER.decode(inStream);
-            return new Checkpoint(numEvents, wallclockBaseTime);
-          }
-          @Override public void verifyDeterministic() throws NonDeterministicException {}
-        };
-
-    private final long numEvents;
-    private final long wallclockBaseTime;
-
-    private Checkpoint(long numEvents, long wallclockBaseTime) {
-      this.numEvents = numEvents;
-      this.wallclockBaseTime = wallclockBaseTime;
-    }
-
-    public Generator toGenerator(GeneratorConfig config) {
-      return new Generator(config, numEvents, wallclockBaseTime);
-    }
-
-    @Override
-    public void finalizeCheckpoint() throws IOException {
-      // Nothing to finalize.
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
-          numEvents, wallclockBaseTime);
-    }
-  }
-
-  /**
-   * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
-   * (arbitrary but stable) event hash order.
-   */
-  public static class NextEvent implements Comparable<NextEvent> {
-    /** When, in wallclock time, should this event be emitted? */
-    public final long wallclockTimestamp;
-
-    /** When, in event time, should this event be considered to have occured? */
-    public final long eventTimestamp;
-
-    /** The event itself. */
-    public final Event event;
-
-    /** The minimum of this and all future event timestamps. */
-    public final long watermark;
-
-    public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
-      this.wallclockTimestamp = wallclockTimestamp;
-      this.eventTimestamp = eventTimestamp;
-      this.event = event;
-      this.watermark = watermark;
-    }
-
-    /**
-     * Return a deep copy of next event with delay added to wallclock timestamp and
-     * event annotate as 'LATE'.
-     */
-    public NextEvent withDelay(long delayMs) {
-      return new NextEvent(
-          wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
-    }
-
-    @Override public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      NextEvent nextEvent = (NextEvent) o;
-
-      return (wallclockTimestamp == nextEvent.wallclockTimestamp
-          && eventTimestamp == nextEvent.eventTimestamp
-          && watermark == nextEvent.watermark
-          && event.equals(nextEvent.event));
-    }
-
-    @Override public int hashCode() {
-      return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
-    }
-
-    @Override
-    public int compareTo(NextEvent other) {
-      int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
-      if (i != 0) {
-        return i;
-      }
-      return Integer.compare(event.hashCode(), other.event.hashCode());
-    }
-  }
-
-  /**
-   * Configuration to generate events against. Note that it may be replaced by a call to
-   * {@link #splitAtEventId}.
-   */
-  private GeneratorConfig config;
-
-  /** Number of events generated by this generator. */
-  private long numEvents;
-
-  /**
-   * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
-   */
-  private long wallclockBaseTime;
-
-  private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
-    checkNotNull(config);
-    this.config = config;
-    this.numEvents = numEvents;
-    this.wallclockBaseTime = wallclockBaseTime;
-  }
-
-  /**
-   * Create a fresh generator according to {@code config}.
-   */
-  public Generator(GeneratorConfig config) {
-    this(config, 0, -1);
-  }
-
-  /**
-   * Return a checkpoint for the current generator.
-   */
-  public Checkpoint toCheckpoint() {
-    return new Checkpoint(numEvents, wallclockBaseTime);
-  }
-
-  /**
-   * Return a deep copy of this generator.
-   */
-  public Generator copy() {
-    checkNotNull(config);
-    Generator result = new Generator(config, numEvents, wallclockBaseTime);
-    return result;
-  }
-
-  /**
-   * Return the current config for this generator. Note that configs may be replaced by {@link
-   * #splitAtEventId}.
-   */
-  public GeneratorConfig getCurrentConfig() {
-    return config;
-  }
-
-  /**
-   * Mutate this generator so that it will only generate events up to but not including
-   * {@code eventId}. Return a config to represent the events this generator will no longer yield.
-   * The generators will run in on a serial timeline.
-   */
-  public GeneratorConfig splitAtEventId(long eventId) {
-    long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
-    GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
-        config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
-    config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
-    return remainConfig;
-  }
-
-  /**
-   * Return the next 'event id'. Though events don't have ids we can simulate them to
-   * help with bookkeeping.
-   */
-  public long getNextEventId() {
-    return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
-  }
-
-  /**
-   * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
-   * due to generate a person.
-   */
-  private long lastBase0PersonId() {
-    long eventId = getNextEventId();
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset >= GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate an auction or bid.
-      // Go back to the last person generated in this epoch.
-      offset = GeneratorConfig.PERSON_PROPORTION - 1;
-    }
-    // About to generate a person.
-    return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
-  }
-
-  /**
-   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
-   * due to generate an auction.
-   */
-  private long lastBase0AuctionId() {
-    long eventId = getNextEventId();
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset < GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate a person.
-      // Go back to the last auction in the last epoch.
-      epoch--;
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
-      // About to generate a bid.
-      // Go back to the last auction generated in this epoch.
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else {
-      // About to generate an auction.
-      offset -= GeneratorConfig.PERSON_PROPORTION;
-    }
-    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
-  }
-
-  /** return a random US state. */
-  private static String nextUSState(Random random) {
-    return US_STATES.get(random.nextInt(US_STATES.size()));
-  }
-
-  /** Return a random US city. */
-  private static String nextUSCity(Random random) {
-    return US_CITIES.get(random.nextInt(US_CITIES.size()));
-  }
-
-  /** Return a random person name. */
-  private static String nextPersonName(Random random) {
-    return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
-        + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
-  }
-
-  /** Return a random string of up to {@code maxLength}. */
-  private static String nextString(Random random, int maxLength) {
-    int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
-    StringBuilder sb = new StringBuilder();
-    while (len-- > 0) {
-      if (random.nextInt(13) == 0) {
-        sb.append(' ');
-      } else {
-        sb.append((char) ('a' + random.nextInt(26)));
-      }
-    }
-    return sb.toString().trim();
-  }
-
-  /** Return a random string of exactly {@code length}. */
-  private static String nextExactString(Random random, int length) {
-    StringBuilder sb = new StringBuilder();
-    while (length-- > 0) {
-      sb.append((char) ('a' + random.nextInt(26)));
-    }
-    return sb.toString();
-  }
-
-  /** Return a random email address. */
-  private static String nextEmail(Random random) {
-    return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
-  }
-
-  /** Return a random credit card number. */
-  private static String nextCreditCard(Random random) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < 4; i++) {
-      if (i > 0) {
-        sb.append(' ');
-      }
-      sb.append(String.format("%04d", random.nextInt(10000)));
-    }
-    return sb.toString();
-  }
-
-  /** Return a random price. */
-  private static long nextPrice(Random random) {
-    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
-  }
-
-  /** Return a random time delay, in milliseconds, for length of auctions. */
-  private long nextAuctionLengthMs(Random random, long timestamp) {
-    // What's our current event number?
-    long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
-    // How many events till we've generated numInFlightAuctions?
-    long numEventsForAuctions =
-        (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
-        / GeneratorConfig.AUCTION_PROPORTION;
-    // When will the auction numInFlightAuctions beyond now be generated?
-    long futureAuction =
-        config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
-            .getKey();
-    // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
-    //     futureAuction - timestamp, numEventsForAuctions);
-    // Choose a length with average horizonMs.
-    long horizonMs = futureAuction - timestamp;
-    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
-  }
-
-  /**
-   * Return a random {@code string} such that {@code currentSize + string.length()} is on average
-   * {@code averageSize}.
-   */
-  private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
-    if (currentSize > desiredAverageSize) {
-      return "";
-    }
-    desiredAverageSize -= currentSize;
-    int delta = (int) Math.round(desiredAverageSize * 0.2);
-    int minSize = desiredAverageSize - delta;
-    int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
-    return nextExactString(random, desiredSize);
-  }
-
-  /** Return a random long from {@code [0, n)}. */
-  private static long nextLong(Random random, long n) {
-    if (n < Integer.MAX_VALUE) {
-      return random.nextInt((int) n);
-    } else {
-      // WARNING: Very skewed distribution! Bad!
-      return Math.abs(random.nextLong() % n);
-    }
-  }
-
-  /**
-   * Generate and return a random person with next available id.
-   */
-  private Person nextPerson(Random random, long timestamp) {
-    long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
-    String name = nextPersonName(random);
-    String email = nextEmail(random);
-    String creditCard = nextCreditCard(random);
-    String city = nextUSCity(random);
-    String state = nextUSState(random);
-    int currentSize =
-        8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
-    String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
-    return new Person(id, name, email, creditCard, city, state, timestamp, extra);
-  }
-
-  /**
-   * Return a random person id (base 0).
-   */
-  private long nextBase0PersonId(Random random) {
-    // Choose a random person from any of the 'active' people, plus a few 'leads'.
-    // By limiting to 'active' we ensure the density of bids or auctions per person
-    // does not decrease over time for long running jobs.
-    // By choosing a person id ahead of the last valid person id we will make
-    // newPerson and newAuction events appear to have been swapped in time.
-    long numPeople = lastBase0PersonId() + 1;
-    long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
-    long n = nextLong(random, activePeople + PERSON_ID_LEAD);
-    return numPeople - activePeople + n;
-  }
-
-  /**
-   * Return a random auction id (base 0).
-   */
-  private long nextBase0AuctionId(Random random) {
-    // Choose a random auction for any of those which are likely to still be in flight,
-    // plus a few 'leads'.
-    // Note that ideally we'd track non-expired auctions exactly, but that state
-    // is difficult to split.
-    long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
-    long maxAuction = lastBase0AuctionId();
-    return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
-  }
-
-  /**
-   * Generate and return a random auction with next available id.
-   */
-  private Auction nextAuction(Random random, long timestamp) {
-    long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
-
-    long seller;
-    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
-    if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
-      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
-      seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
-    } else {
-      seller = nextBase0PersonId(random);
-    }
-    seller += GeneratorConfig.FIRST_PERSON_ID;
-
-    long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
-    long initialBid = nextPrice(random);
-    long expires = timestamp + nextAuctionLengthMs(random, timestamp);
-    String name = nextString(random, 20);
-    String desc = nextString(random, 100);
-    long reserve = initialBid + nextPrice(random);
-    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
-    return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
-        extra);
-  }
-
-  /**
-   * Generate and return a random bid with next available id.
-   */
-  private Bid nextBid(Random random, long timestamp) {
-    long auction;
-    // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
-    if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
-      // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
-      auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
-    } else {
-      auction = nextBase0AuctionId(random);
-    }
-    auction += GeneratorConfig.FIRST_AUCTION_ID;
-
-    long bidder;
-    // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
-    if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
-      // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
-      // last HOT_BIDDER_RATIO people.
-      bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
-    } else {
-      bidder = nextBase0PersonId(random);
-    }
-    bidder += GeneratorConfig.FIRST_PERSON_ID;
-
-    long price = nextPrice(random);
-    int currentSize = 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
-    return new Bid(auction, bidder, price, timestamp, extra);
-  }
-
-  @Override
-  public boolean hasNext() {
-    return numEvents < config.maxEvents;
-  }
-
-  /**
-   * Return the next event. The outer timestamp is in wallclock time and corresponds to
-   * when the event should fire. The inner timestamp is in event-time and represents the
-   * time the event is purported to have taken place in the simulation.
-   */
-  public NextEvent nextEvent() {
-    if (wallclockBaseTime < 0) {
-      wallclockBaseTime = System.currentTimeMillis();
-    }
-    // When, in event time, we should generate the event. Monotonic.
-    long eventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
-    // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
-    // may have local jitter.
-    long adjustedEventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
-            .getKey();
-    // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
-    // the event timestamp.
-    long watermark =
-        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
-            .getKey();
-    // When, in wallclock time, we should emit the event.
-    long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
-
-    // Seed the random number generator with the next 'event id'.
-    Random random = new Random(getNextEventId());
-    long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
-
-    Event event;
-    if (rem < GeneratorConfig.PERSON_PROPORTION) {
-      event = new Event(nextPerson(random, adjustedEventTimestamp));
-    } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
-      event = new Event(nextAuction(random, adjustedEventTimestamp));
-    } else {
-      event = new Event(nextBid(random, adjustedEventTimestamp));
-    }
-
-    numEvents++;
-    return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
-  }
-
-  @Override
-  public TimestampedValue<Event> next() {
-    NextEvent next = nextEvent();
-    return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Return how many microseconds till we emit the next event.
-   */
-  public long currentInterEventDelayUs() {
-    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
-        .getValue();
-  }
-
-  /**
-   * Return an estimate of fraction of output consumed.
-   */
-  public double getFractionConsumed() {
-    return (double) numEvents / config.maxEvents;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
-        numEvents, wallclockBaseTime);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
deleted file mode 100644
index 95c276b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
- */
-public class GeneratorConfig implements Serializable {
-
-  /**
-   * We start the ids at specific values to help ensure the queries find a match even on
-   * small synthesized dataset sizes.
-   */
-  public static final long FIRST_AUCTION_ID = 1000L;
-  public static final long FIRST_PERSON_ID = 1000L;
-  public static final long FIRST_CATEGORY_ID = 10L;
-
-  /**
-   * Proportions of people/auctions/bids to synthesize.
-   */
-  public static final int PERSON_PROPORTION = 1;
-  public static final int AUCTION_PROPORTION = 3;
-  private static final int BID_PROPORTION = 46;
-  public static final int PROPORTION_DENOMINATOR =
-      PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
-
-  /**
-   * Environment options.
-   */
-  public final NexmarkConfiguration configuration;
-
-  /**
-   * Delay between events, in microseconds. If the array has more than one entry then
-   * the rate is changed every {@link #stepLengthSec}, and wraps around.
-   */
-  private final long[] interEventDelayUs;
-
-  /**
-   * Delay before changing the current inter-event delay.
-   */
-  private final long stepLengthSec;
-
-  /**
-   * Time for first event (ms since epoch).
-   */
-  public final long baseTime;
-
-  /**
-   * Event id of first event to be generated. Event ids are unique over all generators, and
-   * are used as a seed to generate each event's data.
-   */
-  public final long firstEventId;
-
-  /**
-   * Maximum number of events to generate.
-   */
-  public final long maxEvents;
-
-  /**
-   * First event number. Generators running in parallel time may share the same event number,
-   * and the event number is used to determine the event timestamp.
-   */
-  public final long firstEventNumber;
-
-  /**
-   * True period of epoch in milliseconds. Derived from above.
-   * (Ie time to run through cycle for all interEventDelayUs entries).
-   */
-  private final long epochPeriodMs;
-
-  /**
-   * Number of events per epoch. Derived from above.
-   * (Ie number of events to run through cycle for all interEventDelayUs entries).
-   */
-  private final long eventsPerEpoch;
-
-  public GeneratorConfig(
-      NexmarkConfiguration configuration, long baseTime, long firstEventId,
-      long maxEventsOrZero, long firstEventNumber) {
-    this.configuration = configuration;
-    this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
-        configuration.firstEventRate, configuration.nextEventRate,
-        configuration.rateUnit, configuration.numEventGenerators);
-    this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
-    this.baseTime = baseTime;
-    this.firstEventId = firstEventId;
-    if (maxEventsOrZero == 0) {
-      // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
-      this.maxEvents =
-          Long.MAX_VALUE / (PROPORTION_DENOMINATOR
-                            * Math.max(
-              Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
-              configuration.avgBidByteSize));
-    } else {
-      this.maxEvents = maxEventsOrZero;
-    }
-    this.firstEventNumber = firstEventNumber;
-
-    long eventsPerEpoch = 0;
-    long epochPeriodMs = 0;
-    if (interEventDelayUs.length > 1) {
-      for (long interEventDelayU : interEventDelayUs) {
-        long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
-        eventsPerEpoch += numEventsForThisCycle;
-        epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
-      }
-    }
-    this.eventsPerEpoch = eventsPerEpoch;
-    this.epochPeriodMs = epochPeriodMs;
-  }
-
-  /**
-   * Return a copy of this config.
-   */
-  public GeneratorConfig copy() {
-    GeneratorConfig result;
-      result = new GeneratorConfig(configuration, baseTime, firstEventId,
-          maxEvents, firstEventNumber);
-    return result;
-  }
-
-  /**
-   * Split this config into {@code n} sub-configs with roughly equal number of
-   * possible events, but distinct value spaces. The generators will run on parallel timelines.
-   * This config should no longer be used.
-   */
-  public List<GeneratorConfig> split(int n) {
-    List<GeneratorConfig> results = new ArrayList<>();
-    if (n == 1) {
-      // No split required.
-      results.add(this);
-    } else {
-      long subMaxEvents = maxEvents / n;
-      long subFirstEventId = firstEventId;
-      for (int i = 0; i < n; i++) {
-        if (i == n - 1) {
-          // Don't loose any events to round-down.
-          subMaxEvents = maxEvents - subMaxEvents * (n - 1);
-        }
-        results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
-        subFirstEventId += subMaxEvents;
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Return copy of this config except with given parameters.
-   */
-  public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
-    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
-  }
-
-  /**
-   * Return an estimate of the bytes needed by {@code numEvents}.
-   */
-  public long estimatedBytesForEvents(long numEvents) {
-    long numPersons =
-        (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
-    long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
-    return numPersons * configuration.avgPersonByteSize
-           + numAuctions * configuration.avgAuctionByteSize
-           + numBids * configuration.avgBidByteSize;
-  }
-
-  /**
-   * Return an estimate of the byte-size of all events a generator for this config would yield.
-   */
-  public long getEstimatedSizeBytes() {
-    return estimatedBytesForEvents(maxEvents);
-  }
-
-  /**
-   * Return the first 'event id' which could be generated from this config. Though events don't
-   * have ids we can simulate them to help bookkeeping.
-   */
-  public long getStartEventId() {
-    return firstEventId + firstEventNumber;
-  }
-
-  /**
-   * Return one past the last 'event id' which could be generated from this config.
-   */
-  public long getStopEventId() {
-    return firstEventId + firstEventNumber + maxEvents;
-  }
-
-  /**
-   * Return the next event number for a generator which has so far emitted {@code numEvents}.
-   */
-  public long nextEventNumber(long numEvents) {
-    return firstEventNumber + numEvents;
-  }
-
-  /**
-   * Return the next event number for a generator which has so far emitted {@code numEvents},
-   * but adjusted to account for {@code outOfOrderGroupSize}.
-   */
-  public long nextAdjustedEventNumber(long numEvents) {
-    long n = configuration.outOfOrderGroupSize;
-    long eventNumber = nextEventNumber(numEvents);
-    long base = (eventNumber / n) * n;
-    long offset = (eventNumber * 953) % n;
-    return base + offset;
-  }
-
-  /**
-   * Return the event number who's event time will be a suitable watermark for
-   * a generator which has so far emitted {@code numEvents}.
-   */
-  public long nextEventNumberForWatermark(long numEvents) {
-    long n = configuration.outOfOrderGroupSize;
-    long eventNumber = nextEventNumber(numEvents);
-    return (eventNumber / n) * n;
-  }
-
-  /**
-   * What timestamp should the event with {@code eventNumber} have for this generator? And
-   * what inter-event delay (in microseconds) is current?
-   */
-  public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
-    if (interEventDelayUs.length == 1) {
-      long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
-      return KV.of(timestamp, interEventDelayUs[0]);
-    }
-
-    long epoch = eventNumber / eventsPerEpoch;
-    long n = eventNumber % eventsPerEpoch;
-    long offsetInEpochMs = 0;
-    for (long interEventDelayU : interEventDelayUs) {
-      long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
-      if (n < numEventsForThisCycle) {
-        long offsetInCycleUs = n * interEventDelayU;
-        long timestamp =
-            baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
-        return KV.of(timestamp, interEventDelayU);
-      }
-      n -= numEventsForThisCycle;
-      offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
-    }
-    throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("GeneratorConfig");
-    sb.append("{configuration:");
-    sb.append(configuration.toString());
-    sb.append(";interEventDelayUs=[");
-    for (int i = 0; i < interEventDelayUs.length; i++) {
-      if (i > 0) {
-        sb.append(",");
-      }
-      sb.append(interEventDelayUs[i]);
-    }
-    sb.append("]");
-    sb.append(";stepLengthSec:");
-    sb.append(stepLengthSec);
-    sb.append(";baseTime:");
-    sb.append(baseTime);
-    sb.append(";firstEventId:");
-    sb.append(firstEventId);
-    sb.append(";maxEvents:");
-    sb.append(maxEvents);
-    sb.append(";firstEventNumber:");
-    sb.append(firstEventNumber);
-    sb.append(";epochPeriodMs:");
-    sb.append(epochPeriodMs);
-    sb.append(";eventsPerEpoch:");
-    sb.append(eventsPerEpoch);
-    sb.append("}");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
deleted file mode 100644
index 09d945d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A custom, unbounded source of event records.
- *
- * <p>If {@code isRateLimited} is true, events become available for return from the reader such
- * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise,
- * events are returned every time the system asks for one.
- */
-public class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> {
-  private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
-  private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
-
-  /** Configuration for generator to use when reading synthetic events. May be split. */
-  private final GeneratorConfig config;
-
-  /** How many unbounded sources to create. */
-  private final int numEventGenerators;
-
-  /** How many seconds to hold back the watermark. */
-  private final long watermarkHoldbackSec;
-
-  /** Are we rate limiting the events? */
-  private final boolean isRateLimited;
-
-  public UnboundedEventSource(GeneratorConfig config, int numEventGenerators,
-      long watermarkHoldbackSec, boolean isRateLimited) {
-    this.config = config;
-    this.numEventGenerators = numEventGenerators;
-    this.watermarkHoldbackSec = watermarkHoldbackSec;
-    this.isRateLimited = isRateLimited;
-  }
-
-  /** A reader to pull events from the generator. */
-  private class EventReader extends UnboundedReader<Event> {
-    /** Generator we are reading from. */
-    private final Generator generator;
-
-    /**
-     * Current watermark (ms since epoch). Initially set to beginning of time.
-     * Then updated to be the time of the next generated event.
-     * Then, once all events have been generated, set to the end of time.
-     */
-    private long watermark;
-
-    /**
-     * Current backlog (ms), as delay between timestamp of last returned event and the timestamp
-     * we should be up to according to wall-clock time. Used only for logging.
-     */
-    private long backlogDurationMs;
-
-    /**
-     * Current backlog, as estimated number of event bytes we are behind, or null if
-     * unknown. Reported to callers.
-     */
-    @Nullable
-    private Long backlogBytes;
-
-    /**
-     * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported.
-     */
-    private long lastReportedBacklogWallclock;
-
-    /**
-     * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never
-     * calculated.
-     */
-    private long timestampAtLastReportedBacklogMs;
-
-    /** Next event to make 'current' when wallclock time has advanced sufficiently. */
-    @Nullable
-    private TimestampedValue<Event> pendingEvent;
-
-    /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */
-    private long pendingEventWallclockTime;
-
-    /** Current event to return from getCurrent. */
-    @Nullable
-    private TimestampedValue<Event> currentEvent;
-
-    /** Events which have been held back so as to force them to be late. */
-    private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
-
-    public EventReader(Generator generator) {
-      this.generator = generator;
-      watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
-      lastReportedBacklogWallclock = -1;
-      pendingEventWallclockTime = -1;
-      timestampAtLastReportedBacklogMs = -1;
-    }
-
-    public EventReader(GeneratorConfig config) {
-      this(new Generator(config));
-    }
-
-    @Override
-    public boolean start() {
-      LOG.trace("starting unbounded generator {}", generator);
-      return advance();
-    }
-
-
-    @Override
-    public boolean advance() {
-      long now = System.currentTimeMillis();
-
-      while (pendingEvent == null) {
-        if (!generator.hasNext() && heldBackEvents.isEmpty()) {
-          // No more events, EVER.
-          if (isRateLimited) {
-            updateBacklog(System.currentTimeMillis(), 0);
-          }
-          if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-            watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
-            LOG.trace("stopped unbounded generator {}", generator);
-          }
-          return false;
-        }
-
-        Generator.NextEvent next = heldBackEvents.peek();
-        if (next != null && next.wallclockTimestamp <= now) {
-          // Time to use the held-back event.
-          heldBackEvents.poll();
-          LOG.debug("replaying held-back event {}ms behind watermark",
-                             watermark - next.eventTimestamp);
-        } else if (generator.hasNext()) {
-          next = generator.nextEvent();
-          if (isRateLimited && config.configuration.probDelayedEvent > 0.0
-              && config.configuration.occasionalDelaySec > 0
-              && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) {
-            // We'll hold back this event and go around again.
-            long delayMs =
-                ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000)
-                + 1L;
-            LOG.debug("delaying event by {}ms", delayMs);
-            heldBackEvents.add(next.withDelay(delayMs));
-            continue;
-          }
-        } else {
-          // Waiting for held-back event to fire.
-          if (isRateLimited) {
-            updateBacklog(now, 0);
-          }
-          return false;
-        }
-
-        pendingEventWallclockTime = next.wallclockTimestamp;
-        pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
-        long newWatermark =
-            next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis();
-        if (newWatermark > watermark) {
-          watermark = newWatermark;
-        }
-      }
-
-      if (isRateLimited) {
-        if (pendingEventWallclockTime > now) {
-          // We want this event to fire in the future. Try again later.
-          updateBacklog(now, 0);
-          return false;
-        }
-        updateBacklog(now, now - pendingEventWallclockTime);
-      }
-
-      // This event is ready to fire.
-      currentEvent = pendingEvent;
-      pendingEvent = null;
-      return true;
-    }
-
-    private void updateBacklog(long now, long newBacklogDurationMs) {
-      backlogDurationMs = newBacklogDurationMs;
-      long interEventDelayUs = generator.currentInterEventDelayUs();
-      if (interEventDelayUs != 0) {
-        long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs;
-        backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
-      }
-      if (lastReportedBacklogWallclock < 0
-          || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
-        double timeDialation = Double.NaN;
-        if (pendingEvent != null
-            && lastReportedBacklogWallclock >= 0
-            && timestampAtLastReportedBacklogMs >= 0) {
-          long wallclockProgressionMs = now - lastReportedBacklogWallclock;
-          long eventTimeProgressionMs =
-              pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs;
-          timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
-        }
-        LOG.debug(
-            "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay "
-            + "with {} time dilation",
-            backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation);
-        lastReportedBacklogWallclock = now;
-        if (pendingEvent != null) {
-          timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis();
-        }
-      }
-    }
-
-    @Override
-    public Event getCurrent() {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getValue();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getTimestamp();
-    }
-
-    @Override
-    public void close() {
-      // Nothing to close.
-    }
-
-    @Override
-    public UnboundedEventSource getCurrentSource() {
-      return UnboundedEventSource.this;
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return new Instant(watermark);
-    }
-
-    @Override
-    public Generator.Checkpoint getCheckpointMark() {
-      return generator.toCheckpoint();
-    }
-
-    @Override
-    public long getSplitBacklogBytes() {
-      return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("EventReader(%d, %d, %d)",
-          generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(),
-          generator.getCurrentConfig().getStopEventId());
-    }
-  }
-
-  @Override
-  public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
-    return Generator.Checkpoint.CODER_INSTANCE;
-  }
-
-  @Override
-  public List<UnboundedEventSource> split(
-      int desiredNumSplits, PipelineOptions options) {
-    LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators);
-    List<UnboundedEventSource> results = new ArrayList<>();
-    // Ignore desiredNumSplits and use numEventGenerators instead.
-    for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
-      results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited));
-    }
-    return results;
-  }
-
-  @Override
-  public EventReader createReader(
-      PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
-    if (checkpoint == null) {
-      LOG.trace("creating initial unbounded reader for {}", config);
-      return new EventReader(config);
-    } else {
-      LOG.trace("resuming unbounded reader from {}", checkpoint);
-      return new EventReader(checkpoint.toGenerator(config));
-    }
-  }
-
-  @Override
-  public void validate() {
-    // Nothing to validate.
-  }
-
-  @Override
-  public Coder<Event> getDefaultOutputCoder() {
-    return Event.CODER;
-  }
-
-  @Override
-  public String toString() {
-    return String.format(
-        "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java
deleted file mode 100644
index ceaec9d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Nexmark Synthetic Sources.
- */
-package org.apache.beam.integration.nexmark.sources;

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties
deleted file mode 100644
index 7dd57b5..0000000
--- a/integration/java/nexmark/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,55 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the console
-log4j.rootCategory=DEBUG, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
-
-# General Beam loggers
-log4j.logger.org.apache.beam.runners.direct=WARN
-log4j.logger.org.apache.beam.sdk=WARN
-
-# Nexmark specific
-log4j.logger.org.apache.beam.integration.nexmark=WARN
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.spark_project.jetty=WARN
-log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
-
-# Setting to quiet spark logs, Beam logs should standout
-log4j.logger.org.apache.beam.runners.spark=INFO
-log4j.logger.org.apache.spark=WARN
-log4j.logger.org.spark-project=WARN
-log4j.logger.io.netty=INFO
-
-# Settings to quiet flink logs
-log4j.logger.org.apache.flink=WARN
-
-# Settings to quiet apex logs
-log4j.logger.org.apache.beam.runners.apex=INFO
-log4j.logger.com.datatorrent=ERROR
-log4j.logger.org.apache.hadoop.metrics2=WARN
-log4j.logger.org.apache.commons=WARN
-log4j.logger.org.apache.hadoop.security=WARN
-log4j.logger.org.apache.hadoop.util=WARN
-
-# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
-log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
deleted file mode 100644
index 64a8e4f..0000000
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.UsesStatefulParDo;
-import org.apache.beam.sdk.testing.UsesTimersInParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Test the various NEXMark queries yield results coherent with their models. */
-@RunWith(JUnit4.class)
-public class QueryTest {
-  private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.copy();
-
-  static {
-    // careful, results of tests are linked to numEventGenerators because of timestamp generation
-    CONFIG.numEventGenerators = 1;
-    CONFIG.numEvents = 1000;
-  }
-
-  @Rule public TestPipeline p = TestPipeline.create();
-
-  /** Test {@code query} matches {@code model}. */
-  private void queryMatchesModel(
-      String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) {
-    NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
-    PCollection<TimestampedValue<KnownSize>> results;
-    if (streamingMode) {
-      results =
-          p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
-    } else {
-      results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
-    }
-    PAssert.that(results).satisfies(model.assertionFor());
-    PipelineResult result = p.run();
-    result.waitUntilFinish();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query0MatchesModelBatch() {
-    queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query0MatchesModelStreaming() {
-    queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query1MatchesModelBatch() {
-    queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query1MatchesModelStreaming() {
-    queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query2MatchesModelBatch() {
-    queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query2MatchesModelStreaming() {
-    queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true);
-  }
-
-  @Test
-  @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
-  public void query3MatchesModelBatch() {
-    queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false);
-  }
-
-  @Test
-  @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
-  public void query3MatchesModelStreaming() {
-    queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query4MatchesModelBatch() {
-    queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query4MatchesModelStreaming() {
-    queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query5MatchesModelBatch() {
-    queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query5MatchesModelStreaming() {
-    queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query6MatchesModelBatch() {
-    queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query6MatchesModelStreaming() {
-    queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query7MatchesModelBatch() {
-    queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query7MatchesModelStreaming() {
-    queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query8MatchesModelBatch() {
-    queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query8MatchesModelStreaming() {
-    queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query9MatchesModelBatch() {
-    queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void query9MatchesModelStreaming() {
-    queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
deleted file mode 100644
index d95461a..0000000
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test {@link BoundedEventSource}.
- */
-@RunWith(JUnit4.class)
-public class BoundedEventSourceTest {
-  private GeneratorConfig makeConfig(long n) {
-    return new GeneratorConfig(
-        NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
-  }
-
-  @Test
-  public void sourceAndReadersWork() throws Exception {
-    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
-    long n = 200L;
-    BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
-
-    SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(
-        source.createReader(options), options);
-  }
-
-  @Test
-  public void splitAtFractionRespectsContract() throws Exception {
-    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
-    long n = 20L;
-    BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
-
-    // Can't split if already consumed.
-    SourceTestUtils.assertSplitAtFractionFails(source, 10, 0.3, options);
-
-    SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 5, 0.3, options);
-
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, options);
-  }
-
-  @Test
-  public void splitIntoBundlesRespectsContract() throws Exception {
-    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
-    long n = 200L;
-    BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
-    SourceTestUtils.assertSourcesEqualReferenceSource(
-        source, source.split(10, options), options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java
deleted file mode 100644
index b0dff2f..0000000
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test {@link Generator}.
- */
-@RunWith(JUnit4.class)
-public class GeneratorTest {
-  private GeneratorConfig makeConfig(long n) {
-    return new GeneratorConfig(
-        NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
-  }
-
-  private <T> long consume(long n, Iterator<T> itr) {
-    for (long i = 0; i < n; i++) {
-      assertTrue(itr.hasNext());
-      itr.next();
-    }
-    return n;
-  }
-
-  private <T> long consume(Iterator<T> itr) {
-    long n = 0;
-    while (itr.hasNext()) {
-      itr.next();
-      n++;
-    }
-    return n;
-  }
-
-  @Test
-  public void splitAtFractionPreservesOverallEventCount() {
-    long n = 55729L;
-    GeneratorConfig initialConfig = makeConfig(n);
-    long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId();
-
-    long actual = 0;
-
-    Generator initialGenerator = new Generator(initialConfig);
-
-    // Consume some events.
-    actual += consume(5000, initialGenerator);
-
-
-    // Split once.
-    GeneratorConfig remainConfig1 = initialGenerator.splitAtEventId(9000L);
-    Generator remainGenerator1 = new Generator(remainConfig1);
-
-    // Consume some more events.
-    actual += consume(2000, initialGenerator);
-    actual += consume(3000, remainGenerator1);
-
-    // Split again.
-    GeneratorConfig remainConfig2 = remainGenerator1.splitAtEventId(30000L);
-    Generator remainGenerator2 = new Generator(remainConfig2);
-
-    // Run to completion.
-    actual += consume(initialGenerator);
-    actual += consume(remainGenerator1);
-    actual += consume(remainGenerator2);
-
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  public void splitPreservesOverallEventCount() {
-    long n = 51237L;
-    GeneratorConfig initialConfig = makeConfig(n);
-    long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId();
-
-    List<Generator> generators = new ArrayList<>();
-    for (GeneratorConfig subConfig : initialConfig.split(20)) {
-      generators.add(new Generator(subConfig));
-    }
-
-    long actual = 0;
-    for (Generator generator : generators) {
-      actual += consume(generator);
-    }
-
-    assertEquals(expected, actual);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
deleted file mode 100644
index 1ecc33e..0000000
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.sources;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
-import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test UnboundedEventSource.
- */
-@RunWith(JUnit4.class)
-public class UnboundedEventSourceTest {
-  private GeneratorConfig makeConfig(long n) {
-    return new GeneratorConfig(
-        NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
-  }
-
-  /**
-   * Helper for tracking which ids we've seen (so we can detect dups) and
-   * confirming reading events match the model events.
-   */
-  private static class EventIdChecker {
-    private final Set<Long> seenPersonIds = new HashSet<>();
-    private final Set<Long> seenAuctionIds = new HashSet<>();
-
-    public void add(Event event) {
-      if (event.newAuction != null) {
-        assertTrue(seenAuctionIds.add(event.newAuction.id));
-      } else if (event.newPerson != null) {
-        assertTrue(seenPersonIds.add(event.newPerson.id));
-      }
-    }
-
-    public void add(int n, UnboundedReader<Event> reader, Generator modelGenerator)
-        throws IOException {
-      for (int i = 0; i < n; i++) {
-        assertTrue(modelGenerator.hasNext());
-        Event modelEvent = modelGenerator.next().getValue();
-        assertTrue(reader.advance());
-        Event actualEvent = reader.getCurrent();
-        assertEquals(modelEvent.toString(), actualEvent.toString());
-        add(actualEvent);
-      }
-    }
-  }
-
-  /**
-   * Check aggressively checkpointing and resuming a reader gives us exactly the
-   * same event stream as reading directly.
-   */
-  @Test
-  public void resumeFromCheckpoint() throws IOException {
-    Random random = new Random(297);
-    int n = 47293;
-    GeneratorConfig config = makeConfig(n);
-    Generator modelGenerator = new Generator(config);
-
-    EventIdChecker checker = new EventIdChecker();
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
-    UnboundedReader<Event> reader = source.createReader(options, null);
-
-    while (n > 0) {
-      int m = Math.min(459 + random.nextInt(455), n);
-      System.out.printf("reading %d...%n", m);
-      checker.add(m, reader, modelGenerator);
-      n -= m;
-      System.out.printf("splitting with %d remaining...%n", n);
-      CheckpointMark checkpointMark = reader.getCheckpointMark();
-      reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
-    }
-
-    assertFalse(reader.advance());
-  }
-}