You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:51:48 UTC

[4/6] beam git commit: [Nexmark] Extract GeneratorCheckpoint into a separate class. Move getNextEvent() call to the top of the stack.

[Nexmark] Extract GeneratorCheckpoint into a separate class. Move getNextEvent() call to the top of the stack.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fce6401
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fce6401
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fce6401

Branch: refs/heads/master
Commit: 4fce6401fa1abb91c0e493791c29fa0f2cefe0d3
Parents: 1b3f1c1
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 14:47:38 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/nexmark/sources/Generator.java     | 145 ++++++-------------
 .../nexmark/sources/GeneratorCheckpoint.java    |  78 ++++++++++
 .../sdk/nexmark/sources/GeneratorConfig.java    |  41 ++++++
 .../nexmark/sources/UnboundedEventSource.java   |  20 +--
 .../sources/UnboundedEventSourceTest.java       |   3 +-
 5 files changed, 179 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
index c368d72..630d0b5 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
@@ -19,20 +19,13 @@ package org.apache.beam.sdk.nexmark.sources;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
+
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
@@ -95,55 +88,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   private static final int HOT_BIDDER_RATIO = 100;
 
   /**
-   * Just enough state to be able to restore a generator back to where it was checkpointed.
-   */
-  public static class Checkpoint implements UnboundedSource.CheckpointMark {
-    private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-    /** Coder for this class. */
-    public static final Coder<Checkpoint> CODER_INSTANCE =
-        new CustomCoder<Checkpoint>() {
-          @Override public void encode(Checkpoint value, OutputStream outStream)
-          throws CoderException, IOException {
-            LONG_CODER.encode(value.numEvents, outStream);
-            LONG_CODER.encode(value.wallclockBaseTime, outStream);
-          }
-
-          @Override
-          public Checkpoint decode(InputStream inStream)
-              throws CoderException, IOException {
-            long numEvents = LONG_CODER.decode(inStream);
-            long wallclockBaseTime = LONG_CODER.decode(inStream);
-            return new Checkpoint(numEvents, wallclockBaseTime);
-          }
-          @Override public void verifyDeterministic() throws NonDeterministicException {}
-        };
-
-    private final long numEvents;
-    private final long wallclockBaseTime;
-
-    private Checkpoint(long numEvents, long wallclockBaseTime) {
-      this.numEvents = numEvents;
-      this.wallclockBaseTime = wallclockBaseTime;
-    }
-
-    public Generator toGenerator(GeneratorConfig config) {
-      return new Generator(config, numEvents, wallclockBaseTime);
-    }
-
-    @Override
-    public void finalizeCheckpoint() throws IOException {
-      // Nothing to finalize.
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
-          numEvents, wallclockBaseTime);
-    }
-  }
-
-  /**
    * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
    * (arbitrary but stable) event hash order.
    */
@@ -213,17 +157,17 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   private GeneratorConfig config;
 
   /** Number of events generated by this generator. */
-  private long numEvents;
+  private long eventsCountSoFar;
 
   /**
    * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
    */
   private long wallclockBaseTime;
 
-  private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
+  Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
     checkNotNull(config);
     this.config = config;
-    this.numEvents = numEvents;
+    this.eventsCountSoFar = eventsCountSoFar;
     this.wallclockBaseTime = wallclockBaseTime;
   }
 
@@ -237,8 +181,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   /**
    * Return a checkpoint for the current generator.
    */
-  public Checkpoint toCheckpoint() {
-    return new Checkpoint(numEvents, wallclockBaseTime);
+  public GeneratorCheckpoint toCheckpoint() {
+    return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
   }
 
   /**
@@ -246,7 +190,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
    */
   public Generator copy() {
     checkNotNull(config);
-    Generator result = new Generator(config, numEvents, wallclockBaseTime);
+    Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
     return result;
   }
 
@@ -276,15 +220,14 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
    * help with bookkeeping.
    */
   public long getNextEventId() {
-    return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
+    return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
   }
 
   /**
    * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
    * due to generate a person.
    */
-  private long lastBase0PersonId() {
-    long eventId = getNextEventId();
+  private long lastBase0PersonId(long eventId) {
     long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
     long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
     if (offset >= GeneratorConfig.PERSON_PROPORTION) {
@@ -300,8 +243,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
    * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
    * due to generate an auction.
    */
-  private long lastBase0AuctionId() {
-    long eventId = getNextEventId();
+  private long lastBase0AuctionId(long eventId) {
     long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
     long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
     if (offset < GeneratorConfig.PERSON_PROPORTION) {
@@ -384,7 +326,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   /** Return a random time delay, in milliseconds, for length of auctions. */
   private long nextAuctionLengthMs(Random random, long timestamp) {
     // What's our current event number?
-    long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
+    long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
     // How many events till we've generated numInFlightAuctions?
     long numEventsForAuctions =
         (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
@@ -428,8 +370,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   /**
    * Generate and return a random person with next available id.
    */
-  private Person nextPerson(Random random, long timestamp) {
-    long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
+  private Person nextPerson(long nextEventId, Random random, long timestamp) {
+    long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
     String name = nextPersonName(random);
     String email = nextEmail(random);
     String creditCard = nextCreditCard(random);
@@ -444,13 +386,13 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   /**
    * Return a random person id (base 0).
    */
-  private long nextBase0PersonId(Random random) {
+  private long nextBase0PersonId(long eventId, Random random) {
     // Choose a random person from any of the 'active' people, plus a few 'leads'.
     // By limiting to 'active' we ensure the density of bids or auctions per person
     // does not decrease over time for long running jobs.
     // By choosing a person id ahead of the last valid person id we will make
     // newPerson and newAuction events appear to have been swapped in time.
-    long numPeople = lastBase0PersonId() + 1;
+    long numPeople = lastBase0PersonId(eventId) + 1;
     long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
     long n = nextLong(random, activePeople + PERSON_ID_LEAD);
     return numPeople - activePeople + n;
@@ -459,29 +401,30 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   /**
    * Return a random auction id (base 0).
    */
-  private long nextBase0AuctionId(Random random) {
+  private long nextBase0AuctionId(long nextEventId, Random random) {
     // Choose a random auction for any of those which are likely to still be in flight,
     // plus a few 'leads'.
     // Note that ideally we'd track non-expired auctions exactly, but that state
     // is difficult to split.
-    long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
-    long maxAuction = lastBase0AuctionId();
+    long minAuction = Math.max(
+        lastBase0AuctionId(nextEventId) - config.configuration.numInFlightAuctions, 0);
+    long maxAuction = lastBase0AuctionId(nextEventId);
     return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
   }
 
   /**
    * Generate and return a random auction with next available id.
    */
-  private Auction nextAuction(Random random, long timestamp) {
-    long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
+  private Auction nextAuction(long eventId, Random random, long timestamp) {
+    long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
 
     long seller;
     // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
     if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
       // Choose the first person in the batch of last HOT_SELLER_RATIO people.
-      seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+      seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
     } else {
-      seller = nextBase0PersonId(random);
+      seller = nextBase0PersonId(eventId, random);
     }
     seller += GeneratorConfig.FIRST_PERSON_ID;
 
@@ -500,14 +443,14 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   /**
    * Generate and return a random bid with next available id.
    */
-  private Bid nextBid(Random random, long timestamp) {
+  private Bid nextBid(long eventId, Random random, long timestamp) {
     long auction;
     // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
     if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
       // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
-      auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+      auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
     } else {
-      auction = nextBase0AuctionId(random);
+      auction = nextBase0AuctionId(eventId, random);
     }
     auction += GeneratorConfig.FIRST_AUCTION_ID;
 
@@ -516,9 +459,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
     if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
       // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
       // last HOT_BIDDER_RATIO people.
-      bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+      bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
     } else {
-      bidder = nextBase0PersonId(random);
+      bidder = nextBase0PersonId(eventId, random);
     }
     bidder += GeneratorConfig.FIRST_PERSON_ID;
 
@@ -530,7 +473,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
 
   @Override
   public boolean hasNext() {
-    return numEvents < config.maxEvents;
+    return eventsCountSoFar < config.maxEvents;
   }
 
   /**
@@ -544,34 +487,40 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
     }
     // When, in event time, we should generate the event. Monotonic.
     long eventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
+        config.timestampAndInterEventDelayUsForEvent(
+            config.nextEventNumber(eventsCountSoFar)).getKey();
     // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
     // may have local jitter.
     long adjustedEventTimestamp =
-        config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
+        config.timestampAndInterEventDelayUsForEvent(
+            config.nextAdjustedEventNumber(eventsCountSoFar))
             .getKey();
     // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
     // the event timestamp.
     long watermark =
-        config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
+        config.timestampAndInterEventDelayUsForEvent(
+            config.nextEventNumberForWatermark(eventsCountSoFar))
             .getKey();
     // When, in wallclock time, we should emit the event.
     long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
 
     // Seed the random number generator with the next 'event id'.
     Random random = new Random(getNextEventId());
-    long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+
+    long newEventId = getNextEventId();
+    long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
 
     Event event;
     if (rem < GeneratorConfig.PERSON_PROPORTION) {
-      event = new Event(nextPerson(random, adjustedEventTimestamp));
+      event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp));
     } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
-      event = new Event(nextAuction(random, adjustedEventTimestamp));
+      event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp));
     } else {
-      event = new Event(nextBid(random, adjustedEventTimestamp));
+      event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
     }
 
-    numEvents++;
+    eventsCountSoFar++;
     return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
   }
 
@@ -590,7 +539,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
    * Return how many microseconds till we emit the next event.
    */
   public long currentInterEventDelayUs() {
-    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
+    return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
         .getValue();
   }
 
@@ -598,12 +547,12 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
    * Return an estimate of fraction of output consumed.
    */
   public double getFractionConsumed() {
-    return (double) numEvents / config.maxEvents;
+    return (double) eventsCountSoFar / config.maxEvents;
   }
 
   @Override
   public String toString() {
-    return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
-        numEvents, wallclockBaseTime);
+    return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
+        eventsCountSoFar, wallclockBaseTime);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
new file mode 100644
index 0000000..dfc135d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  /** Coder for this class. */
+  public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
+      new CustomCoder<GeneratorCheckpoint>() {
+        @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
+            throws CoderException, IOException {
+          LONG_CODER.encode(value.numEvents, outStream);
+          LONG_CODER.encode(value.wallclockBaseTime, outStream);
+        }
+
+        @Override
+        public GeneratorCheckpoint decode(InputStream inStream)
+            throws CoderException, IOException {
+          long numEvents = LONG_CODER.decode(inStream);
+          long wallclockBaseTime = LONG_CODER.decode(inStream);
+          return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
+        }
+        @Override public void verifyDeterministic() throws NonDeterministicException {}
+      };
+
+  private final long numEvents;
+  private final long wallclockBaseTime;
+
+  GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
+    this.numEvents = numEvents;
+    this.wallclockBaseTime = wallclockBaseTime;
+  }
+
+  public Generator toGenerator(GeneratorConfig config) {
+    return new Generator(config, numEvents, wallclockBaseTime);
+  }
+
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+    // Nothing to finalize.
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Generator.GeneratorCheckpoint{numEvents:%d;wallclockBaseTime:%d}",
+        numEvents, wallclockBaseTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
index 42183c6..8e0a899 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.nexmark.sources;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.values.KV;
@@ -186,6 +187,46 @@ public class GeneratorConfig implements Serializable {
            + numBids * configuration.avgBidByteSize;
   }
 
+  public int getAvgPersonByteSize() {
+    return configuration.avgPersonByteSize;
+  }
+
+  public int getNumActivePeople() {
+    return configuration.numActivePeople;
+  }
+
+  public int getHotSellersRatio() {
+    return configuration.hotSellersRatio;
+  }
+
+  public int getNumInFlightAuctions() {
+    return configuration.numInFlightAuctions;
+  }
+
+  public int getHotAuctionRatio() {
+    return configuration.hotAuctionRatio;
+  }
+
+  public int getHotBiddersRatio() {
+    return configuration.hotBiddersRatio;
+  }
+
+  public int getAvgBidByteSize() {
+    return configuration.avgBidByteSize;
+  }
+
+  public int getAvgAuctionByteSize() {
+    return configuration.avgAuctionByteSize;
+  }
+
+  public double getProbDelayedEvent() {
+    return configuration.probDelayedEvent;
+  }
+
+  public long getOccasionalDelaySec() {
+    return configuration.occasionalDelaySec;
+  }
+
   /**
    * Return an estimate of the byte-size of all events a generator for this config would yield.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index 8f5575c..74eb061 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -23,7 +23,9 @@ import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.concurrent.ThreadLocalRandom;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
@@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory;
  * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise,
  * events are returned every time the system asks for one.
  */
-public class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> {
+public class UnboundedEventSource extends UnboundedSource<Event, GeneratorCheckpoint> {
   private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
 
@@ -161,12 +163,12 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
                              watermark - next.eventTimestamp);
         } else if (generator.hasNext()) {
           next = generator.nextEvent();
-          if (isRateLimited && config.configuration.probDelayedEvent > 0.0
-              && config.configuration.occasionalDelaySec > 0
-              && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) {
+          if (isRateLimited && config.getProbDelayedEvent() > 0.0
+              && config.getOccasionalDelaySec() > 0
+              && ThreadLocalRandom.current().nextDouble() < config.getProbDelayedEvent()) {
             // We'll hold back this event and go around again.
             long delayMs =
-                ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000)
+                ThreadLocalRandom.current().nextLong(config.getOccasionalDelaySec() * 1000)
                 + 1L;
             LOG.debug("delaying event by {}ms", delayMs);
             heldBackEvents.add(next.withDelay(delayMs));
@@ -265,7 +267,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
     }
 
     @Override
-    public Generator.Checkpoint getCheckpointMark() {
+    public GeneratorCheckpoint getCheckpointMark() {
       return generator.toCheckpoint();
     }
 
@@ -283,8 +285,8 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
   }
 
   @Override
-  public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
-    return Generator.Checkpoint.CODER_INSTANCE;
+  public Coder<GeneratorCheckpoint> getCheckpointMarkCoder() {
+    return GeneratorCheckpoint.CODER_INSTANCE;
   }
 
   @Override
@@ -301,7 +303,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
 
   @Override
   public EventReader createReader(
-      PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
+      PipelineOptions options, @Nullable GeneratorCheckpoint checkpoint) {
     if (checkpoint == null) {
       LOG.trace("creating initial unbounded reader for {}", config);
       return new EventReader(config);

http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
index 3853ede..c00d1a3 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
+
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
@@ -97,7 +98,7 @@ public class UnboundedEventSourceTest {
       n -= m;
       System.out.printf("splitting with %d remaining...%n", n);
       CheckpointMark checkpointMark = reader.getCheckpointMark();
-      reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
+      reader = source.createReader(options, (GeneratorCheckpoint) checkpointMark);
     }
 
     assertFalse(reader.advance());