You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:49 UTC
[41/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
new file mode 100644
index 0000000..2ca5a1c
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Queries.
+ */
+package org.apache.beam.sdk.nexmark.queries;
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
new file mode 100644
index 0000000..60124bb
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A custom, bounded source of event records.
+ */
+public class BoundedEventSource extends BoundedSource<Event> {
+ /** Configuration we generate events against. */
+ private final GeneratorConfig config;
+
+ /** How many bounded sources to create. */
+ private final int numEventGenerators;
+
+ public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
+ this.config = config;
+ this.numEventGenerators = numEventGenerators;
+ }
+
+ /** A reader to pull events from the generator. */
+ private static class EventReader extends BoundedReader<Event> {
+ /**
+ * Event source we purporting to be reading from.
+ * (We can't use Java's capture-outer-class pointer since we must update
+ * this field on calls to splitAtFraction.)
+ */
+ private BoundedEventSource source;
+
+ /** Generator we are reading from. */
+ private final Generator generator;
+
+ private boolean reportedStop;
+
+ @Nullable
+ private TimestampedValue<Event> currentEvent;
+
+ public EventReader(BoundedEventSource source, GeneratorConfig config) {
+ this.source = source;
+ generator = new Generator(config);
+ reportedStop = false;
+ }
+
+ @Override
+ public synchronized boolean start() {
+ NexmarkUtils.info("starting bounded generator %s", generator);
+ return advance();
+ }
+
+ @Override
+ public synchronized boolean advance() {
+ if (!generator.hasNext()) {
+ // No more events.
+ if (!reportedStop) {
+ reportedStop = true;
+ NexmarkUtils.info("stopped bounded generator %s", generator);
+ }
+ return false;
+ }
+ currentEvent = generator.next();
+ return true;
+ }
+
+ @Override
+ public synchronized Event getCurrent() throws NoSuchElementException {
+ if (currentEvent == null) {
+ throw new NoSuchElementException();
+ }
+ return currentEvent.getValue();
+ }
+
+ @Override
+ public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (currentEvent == null) {
+ throw new NoSuchElementException();
+ }
+ return currentEvent.getTimestamp();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to close.
+ }
+
+ @Override
+ public synchronized Double getFractionConsumed() {
+ return generator.getFractionConsumed();
+ }
+
+ @Override
+ public synchronized BoundedSource<Event> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ @Nullable
+ public synchronized BoundedEventSource splitAtFraction(double fraction) {
+ long startId = generator.getCurrentConfig().getStartEventId();
+ long stopId = generator.getCurrentConfig().getStopEventId();
+ long size = stopId - startId;
+ long splitEventId = startId + Math.min((int) (size * fraction), size);
+ if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) {
+ // Already passed this position or split results in left or right being empty.
+ NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction);
+ return null;
+ }
+
+ NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId);
+
+ // Scale back the event space of the current generator, and return a generator config
+ // representing the event space we just 'stole' from the current generator.
+ GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId);
+
+ NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig);
+
+ // At this point
+ // generator.events() ++ new Generator(remainingConfig).events()
+ // == originalGenerator.events()
+
+ // We need a new source to represent the now smaller key space for this reader, so
+ // that we can maintain the invariant that
+ // this.getCurrentSource().createReader(...)
+ // will yield the same output as this.
+ source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators);
+
+ // Return a source from which we may read the 'stolen' event space.
+ return new BoundedEventSource(remainingConfig, source.numEventGenerators);
+ }
+ }
+
+ @Override
+ public List<BoundedEventSource> split(
+ long desiredBundleSizeBytes, PipelineOptions options) {
+ NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
+ List<BoundedEventSource> results = new ArrayList<>();
+ // Ignore desiredBundleSizeBytes and use numEventGenerators instead.
+ for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
+ results.add(new BoundedEventSource(subConfig, 1));
+ }
+ return results;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) {
+ return config.getEstimatedSizeBytes();
+ }
+
+ @Override
+ public EventReader createReader(PipelineOptions options) {
+ NexmarkUtils.info("creating initial bounded reader for %s", config);
+ return new EventReader(this, config);
+ }
+
+ @Override
+ public void validate() {
+ // Nothing to validate.
+ }
+
+ @Override
+ public Coder<Event> getDefaultOutputCoder() {
+ return Event.CODER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
new file mode 100644
index 0000000..c368d72
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
+ * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
+ * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
+ *
+ * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
+ * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
+ * (in microseconds). The event stream is thus fully deterministic and does not depend on
+ * wallclock time.
+ *
+ * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
+ * so that we can resume generating events from a saved snapshot.
+ */
+public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
+ /**
+ * Keep the number of categories small so the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final int NUM_CATEGORIES = 5;
+
+ /** Smallest random string size. */
+ private static final int MIN_STRING_LENGTH = 3;
+
+ /**
+ * Keep the number of states small so that the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+ private static final List<String> US_CITIES =
+ Arrays.asList(
+ ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+ .split(","));
+
+ private static final List<String> FIRST_NAMES =
+ Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+ private static final List<String> LAST_NAMES =
+ Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int PERSON_ID_LEAD = 10;
+ private static final int AUCTION_ID_LEAD = 10;
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_AUCTION_RATIO = 100;
+ private static final int HOT_SELLER_RATIO = 100;
+ private static final int HOT_BIDDER_RATIO = 100;
+
+ /**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+ public static class Checkpoint implements UnboundedSource.CheckpointMark {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ /** Coder for this class. */
+ public static final Coder<Checkpoint> CODER_INSTANCE =
+ new CustomCoder<Checkpoint>() {
+ @Override public void encode(Checkpoint value, OutputStream outStream)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.numEvents, outStream);
+ LONG_CODER.encode(value.wallclockBaseTime, outStream);
+ }
+
+ @Override
+ public Checkpoint decode(InputStream inStream)
+ throws CoderException, IOException {
+ long numEvents = LONG_CODER.decode(inStream);
+ long wallclockBaseTime = LONG_CODER.decode(inStream);
+ return new Checkpoint(numEvents, wallclockBaseTime);
+ }
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
+ };
+
+ private final long numEvents;
+ private final long wallclockBaseTime;
+
+ private Checkpoint(long numEvents, long wallclockBaseTime) {
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ public Generator toGenerator(GeneratorConfig config) {
+ return new Generator(config, numEvents, wallclockBaseTime);
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ // Nothing to finalize.
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
+ numEvents, wallclockBaseTime);
+ }
+ }
+
+ /**
+ * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
+ * (arbitrary but stable) event hash order.
+ */
+ public static class NextEvent implements Comparable<NextEvent> {
+ /** When, in wallclock time, should this event be emitted? */
+ public final long wallclockTimestamp;
+
+ /** When, in event time, should this event be considered to have occured? */
+ public final long eventTimestamp;
+
+ /** The event itself. */
+ public final Event event;
+
+ /** The minimum of this and all future event timestamps. */
+ public final long watermark;
+
+ public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
+ this.wallclockTimestamp = wallclockTimestamp;
+ this.eventTimestamp = eventTimestamp;
+ this.event = event;
+ this.watermark = watermark;
+ }
+
+ /**
+ * Return a deep copy of next event with delay added to wallclock timestamp and
+ * event annotate as 'LATE'.
+ */
+ public NextEvent withDelay(long delayMs) {
+ return new NextEvent(
+ wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NextEvent nextEvent = (NextEvent) o;
+
+ return (wallclockTimestamp == nextEvent.wallclockTimestamp
+ && eventTimestamp == nextEvent.eventTimestamp
+ && watermark == nextEvent.watermark
+ && event.equals(nextEvent.event));
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
+ }
+
+ @Override
+ public int compareTo(NextEvent other) {
+ int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
+ if (i != 0) {
+ return i;
+ }
+ return Integer.compare(event.hashCode(), other.event.hashCode());
+ }
+ }
+
+ /**
+ * Configuration to generate events against. Note that it may be replaced by a call to
+ * {@link #splitAtEventId}.
+ */
+ private GeneratorConfig config;
+
+ /** Number of events generated by this generator. */
+ private long numEvents;
+
+ /**
+ * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
+ */
+ private long wallclockBaseTime;
+
+ private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
+ checkNotNull(config);
+ this.config = config;
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ /**
+ * Create a fresh generator according to {@code config}.
+ */
+ public Generator(GeneratorConfig config) {
+ this(config, 0, -1);
+ }
+
+ /**
+ * Return a checkpoint for the current generator.
+ */
+ public Checkpoint toCheckpoint() {
+ return new Checkpoint(numEvents, wallclockBaseTime);
+ }
+
+ /**
+ * Return a deep copy of this generator.
+ */
+ public Generator copy() {
+ checkNotNull(config);
+ Generator result = new Generator(config, numEvents, wallclockBaseTime);
+ return result;
+ }
+
+ /**
+ * Return the current config for this generator. Note that configs may be replaced by {@link
+ * #splitAtEventId}.
+ */
+ public GeneratorConfig getCurrentConfig() {
+ return config;
+ }
+
+ /**
+ * Mutate this generator so that it will only generate events up to but not including
+ * {@code eventId}. Return a config to represent the events this generator will no longer yield.
+ * The generators will run in on a serial timeline.
+ */
+ public GeneratorConfig splitAtEventId(long eventId) {
+ long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
+ GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
+ config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
+ config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+ return remainConfig;
+ }
+
+ /**
+ * Return the next 'event id'. Though events don't have ids we can simulate them to
+ * help with bookkeeping.
+ */
+ public long getNextEventId() {
+ return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
+ }
+
+ /**
+ * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+ * due to generate a person.
+ */
+ private long lastBase0PersonId() {
+ long eventId = getNextEventId();
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate an auction or bid.
+ // Go back to the last person generated in this epoch.
+ offset = GeneratorConfig.PERSON_PROPORTION - 1;
+ }
+ // About to generate a person.
+ return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+ }
+
+ /**
+ * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+ * due to generate an auction.
+ */
+ private long lastBase0AuctionId() {
+ long eventId = getNextEventId();
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset < GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate a person.
+ // Go back to the last auction in the last epoch.
+ epoch--;
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ // About to generate a bid.
+ // Go back to the last auction generated in this epoch.
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else {
+ // About to generate an auction.
+ offset -= GeneratorConfig.PERSON_PROPORTION;
+ }
+ return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+ }
+
+ /** return a random US state. */
+ private static String nextUSState(Random random) {
+ return US_STATES.get(random.nextInt(US_STATES.size()));
+ }
+
+ /** Return a random US city. */
+ private static String nextUSCity(Random random) {
+ return US_CITIES.get(random.nextInt(US_CITIES.size()));
+ }
+
+ /** Return a random person name. */
+ private static String nextPersonName(Random random) {
+ return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+ + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+ }
+
+ /** Return a random string of up to {@code maxLength}. */
+ private static String nextString(Random random, int maxLength) {
+ int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+ StringBuilder sb = new StringBuilder();
+ while (len-- > 0) {
+ if (random.nextInt(13) == 0) {
+ sb.append(' ');
+ } else {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ /** Return a random string of exactly {@code length}. */
+ private static String nextExactString(Random random, int length) {
+ StringBuilder sb = new StringBuilder();
+ while (length-- > 0) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /** Return a random email address. */
+ private static String nextEmail(Random random) {
+ return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+ }
+
+ /** Return a random credit card number. */
+ private static String nextCreditCard(Random random) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ if (i > 0) {
+ sb.append(' ');
+ }
+ sb.append(String.format("%04d", random.nextInt(10000)));
+ }
+ return sb.toString();
+ }
+
+ /** Return a random price. */
+ private static long nextPrice(Random random) {
+ return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+ }
+
+ /** Return a random time delay, in milliseconds, for length of auctions. */
+ private long nextAuctionLengthMs(Random random, long timestamp) {
+ // What's our current event number?
+ long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
+ // How many events till we've generated numInFlightAuctions?
+ long numEventsForAuctions =
+ (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // When will the auction numInFlightAuctions beyond now be generated?
+ long futureAuction =
+ config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+ .getKey();
+ // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+ // futureAuction - timestamp, numEventsForAuctions);
+ // Choose a length with average horizonMs.
+ long horizonMs = futureAuction - timestamp;
+ return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+ }
+
+ /**
+ * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+ * {@code averageSize}.
+ */
+ private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+ if (currentSize > desiredAverageSize) {
+ return "";
+ }
+ desiredAverageSize -= currentSize;
+ int delta = (int) Math.round(desiredAverageSize * 0.2);
+ int minSize = desiredAverageSize - delta;
+ int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+ return nextExactString(random, desiredSize);
+ }
+
+ /** Return a random long from {@code [0, n)}. */
+ private static long nextLong(Random random, long n) {
+ if (n < Integer.MAX_VALUE) {
+ return random.nextInt((int) n);
+ } else {
+ // WARNING: Very skewed distribution! Bad!
+ return Math.abs(random.nextLong() % n);
+ }
+ }
+
+ /**
+ * Generate and return a random person with next available id.
+ */
+ private Person nextPerson(Random random, long timestamp) {
+ long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
+ String name = nextPersonName(random);
+ String email = nextEmail(random);
+ String creditCard = nextCreditCard(random);
+ String city = nextUSCity(random);
+ String state = nextUSState(random);
+ int currentSize =
+ 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+ String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
+ return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+ }
+
+ /**
+ * Return a random person id (base 0).
+ */
+ private long nextBase0PersonId(Random random) {
+ // Choose a random person from any of the 'active' people, plus a few 'leads'.
+ // By limiting to 'active' we ensure the density of bids or auctions per person
+ // does not decrease over time for long running jobs.
+ // By choosing a person id ahead of the last valid person id we will make
+ // newPerson and newAuction events appear to have been swapped in time.
+ long numPeople = lastBase0PersonId() + 1;
+ long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
+ long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+ return numPeople - activePeople + n;
+ }
+
+ /**
+ * Return a random auction id (base 0).
+ */
+ private long nextBase0AuctionId(Random random) {
+ // Choose a random auction for any of those which are likely to still be in flight,
+ // plus a few 'leads'.
+ // Note that ideally we'd track non-expired auctions exactly, but that state
+ // is difficult to split.
+ long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
+ long maxAuction = lastBase0AuctionId();
+ return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+ }
+
+ /**
+ * Generate and return a random auction with next available id.
+ */
+ private Auction nextAuction(Random random, long timestamp) {
+ long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
+
+ long seller;
+ // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+ if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
+ // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+ seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+ } else {
+ seller = nextBase0PersonId(random);
+ }
+ seller += GeneratorConfig.FIRST_PERSON_ID;
+
+ long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+ long initialBid = nextPrice(random);
+ long expires = timestamp + nextAuctionLengthMs(random, timestamp);
+ String name = nextString(random, 20);
+ String desc = nextString(random, 100);
+ long reserve = initialBid + nextPrice(random);
+ int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
+ return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
+ extra);
+ }
+
+ /**
+ * Generate and return a random bid with next available id.
+ */
+ private Bid nextBid(Random random, long timestamp) {
+ long auction;
+ // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
+ if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
+ // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
+ auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+ } else {
+ auction = nextBase0AuctionId(random);
+ }
+ auction += GeneratorConfig.FIRST_AUCTION_ID;
+
+ long bidder;
+ // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
+ if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
+ // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
+ // last HOT_BIDDER_RATIO people.
+ bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+ } else {
+ bidder = nextBase0PersonId(random);
+ }
+ bidder += GeneratorConfig.FIRST_PERSON_ID;
+
+ long price = nextPrice(random);
+ int currentSize = 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
+ return new Bid(auction, bidder, price, timestamp, extra);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return numEvents < config.maxEvents;
+ }
+
+ /**
+ * Return the next event. The outer timestamp is in wallclock time and corresponds to
+ * when the event should fire. The inner timestamp is in event-time and represents the
+ * time the event is purported to have taken place in the simulation.
+ */
+ public NextEvent nextEvent() {
+ if (wallclockBaseTime < 0) {
+ wallclockBaseTime = System.currentTimeMillis();
+ }
+ // When, in event time, we should generate the event. Monotonic.
+ long eventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
+ // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
+ // may have local jitter.
+ long adjustedEventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
+ .getKey();
+ // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
+ // the event timestamp.
+ long watermark =
+ config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
+ .getKey();
+ // When, in wallclock time, we should emit the event.
+ long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
+
+ // Seed the random number generator with the next 'event id'.
+ Random random = new Random(getNextEventId());
+ long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+ Event event;
+ if (rem < GeneratorConfig.PERSON_PROPORTION) {
+ event = new Event(nextPerson(random, adjustedEventTimestamp));
+ } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ event = new Event(nextAuction(random, adjustedEventTimestamp));
+ } else {
+ event = new Event(nextBid(random, adjustedEventTimestamp));
+ }
+
+ numEvents++;
+ return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
+ }
+
+ @Override
+ public TimestampedValue<Event> next() {
+ NextEvent next = nextEvent();
+ return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Return how many microseconds till we emit the next event.
+ */
+ public long currentInterEventDelayUs() {
+ return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
+ .getValue();
+ }
+
+ /**
+ * Return an estimate of fraction of output consumed.
+ */
+ public double getFractionConsumed() {
+ return (double) numEvents / config.maxEvents;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
+ numEvents, wallclockBaseTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
new file mode 100644
index 0000000..42183c6
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
+ */
+public class GeneratorConfig implements Serializable {
+
+ /**
+ * We start the ids at specific values to help ensure the queries find a match even on
+ * small synthesized dataset sizes.
+ */
+ public static final long FIRST_AUCTION_ID = 1000L;
+ public static final long FIRST_PERSON_ID = 1000L;
+ public static final long FIRST_CATEGORY_ID = 10L;
+
+ /**
+ * Proportions of people/auctions/bids to synthesize.
+ */
+ public static final int PERSON_PROPORTION = 1;
+ public static final int AUCTION_PROPORTION = 3;
+ private static final int BID_PROPORTION = 46;
+ public static final int PROPORTION_DENOMINATOR =
+ PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
+
+ /**
+ * Environment options.
+ */
+ public final NexmarkConfiguration configuration;
+
+ /**
+ * Delay between events, in microseconds. If the array has more than one entry then
+ * the rate is changed every {@link #stepLengthSec}, and wraps around.
+ */
+ private final long[] interEventDelayUs;
+
+ /**
+ * Delay before changing the current inter-event delay.
+ */
+ private final long stepLengthSec;
+
+ /**
+ * Time for first event (ms since epoch).
+ */
+ public final long baseTime;
+
+ /**
+ * Event id of first event to be generated. Event ids are unique over all generators, and
+ * are used as a seed to generate each event's data.
+ */
+ public final long firstEventId;
+
+ /**
+ * Maximum number of events to generate.
+ */
+ public final long maxEvents;
+
+ /**
+ * First event number. Generators running in parallel time may share the same event number,
+ * and the event number is used to determine the event timestamp.
+ */
+ public final long firstEventNumber;
+
+ /**
+ * True period of epoch in milliseconds. Derived from above.
+ * (Ie time to run through cycle for all interEventDelayUs entries).
+ */
+ private final long epochPeriodMs;
+
+ /**
+ * Number of events per epoch. Derived from above.
+ * (Ie number of events to run through cycle for all interEventDelayUs entries).
+ */
+ private final long eventsPerEpoch;
+
+ public GeneratorConfig(
+ NexmarkConfiguration configuration, long baseTime, long firstEventId,
+ long maxEventsOrZero, long firstEventNumber) {
+ this.configuration = configuration;
+ this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
+ configuration.firstEventRate, configuration.nextEventRate,
+ configuration.rateUnit, configuration.numEventGenerators);
+ this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
+ this.baseTime = baseTime;
+ this.firstEventId = firstEventId;
+ if (maxEventsOrZero == 0) {
+ // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
+ this.maxEvents =
+ Long.MAX_VALUE / (PROPORTION_DENOMINATOR
+ * Math.max(
+ Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
+ configuration.avgBidByteSize));
+ } else {
+ this.maxEvents = maxEventsOrZero;
+ }
+ this.firstEventNumber = firstEventNumber;
+
+ long eventsPerEpoch = 0;
+ long epochPeriodMs = 0;
+ if (interEventDelayUs.length > 1) {
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+ eventsPerEpoch += numEventsForThisCycle;
+ epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+ }
+ }
+ this.eventsPerEpoch = eventsPerEpoch;
+ this.epochPeriodMs = epochPeriodMs;
+ }
+
+ /**
+ * Return a copy of this config.
+ */
+ public GeneratorConfig copy() {
+ GeneratorConfig result;
+ result = new GeneratorConfig(configuration, baseTime, firstEventId,
+ maxEvents, firstEventNumber);
+ return result;
+ }
+
+ /**
+ * Split this config into {@code n} sub-configs with roughly equal number of
+ * possible events, but distinct value spaces. The generators will run on parallel timelines.
+ * This config should no longer be used.
+ */
+ public List<GeneratorConfig> split(int n) {
+ List<GeneratorConfig> results = new ArrayList<>();
+ if (n == 1) {
+ // No split required.
+ results.add(this);
+ } else {
+ long subMaxEvents = maxEvents / n;
+ long subFirstEventId = firstEventId;
+ for (int i = 0; i < n; i++) {
+ if (i == n - 1) {
+ // Don't loose any events to round-down.
+ subMaxEvents = maxEvents - subMaxEvents * (n - 1);
+ }
+ results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
+ subFirstEventId += subMaxEvents;
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Return copy of this config except with given parameters.
+ */
+ public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
+ return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ }
+
+ /**
+ * Return an estimate of the bytes needed by {@code numEvents}.
+ */
+ public long estimatedBytesForEvents(long numEvents) {
+ long numPersons =
+ (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
+ long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
+ return numPersons * configuration.avgPersonByteSize
+ + numAuctions * configuration.avgAuctionByteSize
+ + numBids * configuration.avgBidByteSize;
+ }
+
+ /**
+ * Return an estimate of the byte-size of all events a generator for this config would yield.
+ */
+ public long getEstimatedSizeBytes() {
+ return estimatedBytesForEvents(maxEvents);
+ }
+
+ /**
+ * Return the first 'event id' which could be generated from this config. Though events don't
+ * have ids we can simulate them to help bookkeeping.
+ */
+ public long getStartEventId() {
+ return firstEventId + firstEventNumber;
+ }
+
+ /**
+ * Return one past the last 'event id' which could be generated from this config.
+ */
+ public long getStopEventId() {
+ return firstEventId + firstEventNumber + maxEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumber(long numEvents) {
+ return firstEventNumber + numEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents},
+ * but adjusted to account for {@code outOfOrderGroupSize}.
+ */
+ public long nextAdjustedEventNumber(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ long base = (eventNumber / n) * n;
+ long offset = (eventNumber * 953) % n;
+ return base + offset;
+ }
+
+ /**
+ * Return the event number who's event time will be a suitable watermark for
+ * a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumberForWatermark(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ return (eventNumber / n) * n;
+ }
+
+ /**
+ * What timestamp should the event with {@code eventNumber} have for this generator? And
+ * what inter-event delay (in microseconds) is current?
+ */
+ public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
+ if (interEventDelayUs.length == 1) {
+ long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
+ return KV.of(timestamp, interEventDelayUs[0]);
+ }
+
+ long epoch = eventNumber / eventsPerEpoch;
+ long n = eventNumber % eventsPerEpoch;
+ long offsetInEpochMs = 0;
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+ if (n < numEventsForThisCycle) {
+ long offsetInCycleUs = n * interEventDelayU;
+ long timestamp =
+ baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
+ return KV.of(timestamp, interEventDelayU);
+ }
+ n -= numEventsForThisCycle;
+ offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+ }
+ throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("GeneratorConfig");
+ sb.append("{configuration:");
+ sb.append(configuration.toString());
+ sb.append(";interEventDelayUs=[");
+ for (int i = 0; i < interEventDelayUs.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(interEventDelayUs[i]);
+ }
+ sb.append("]");
+ sb.append(";stepLengthSec:");
+ sb.append(stepLengthSec);
+ sb.append(";baseTime:");
+ sb.append(baseTime);
+ sb.append(";firstEventId:");
+ sb.append(firstEventId);
+ sb.append(";maxEvents:");
+ sb.append(maxEvents);
+ sb.append(";firstEventNumber:");
+ sb.append(firstEventNumber);
+ sb.append(";epochPeriodMs:");
+ sb.append(epochPeriodMs);
+ sb.append(";eventsPerEpoch:");
+ sb.append(eventsPerEpoch);
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
new file mode 100644
index 0000000..8f5575c
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A custom, unbounded source of event records.
+ *
+ * <p>If {@code isRateLimited} is true, events become available for return from the reader such
+ * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise,
+ * events are returned every time the system asks for one.
+ */
+public class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> {
+ private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
+
+ /** Configuration for generator to use when reading synthetic events. May be split. */
+ private final GeneratorConfig config;
+
+ /** How many unbounded sources to create. */
+ private final int numEventGenerators;
+
+ /** How many seconds to hold back the watermark. */
+ private final long watermarkHoldbackSec;
+
+ /** Are we rate limiting the events? */
+ private final boolean isRateLimited;
+
+ public UnboundedEventSource(GeneratorConfig config, int numEventGenerators,
+ long watermarkHoldbackSec, boolean isRateLimited) {
+ this.config = config;
+ this.numEventGenerators = numEventGenerators;
+ this.watermarkHoldbackSec = watermarkHoldbackSec;
+ this.isRateLimited = isRateLimited;
+ }
+
+ /** A reader to pull events from the generator. */
+ private class EventReader extends UnboundedReader<Event> {
+ /** Generator we are reading from. */
+ private final Generator generator;
+
+ /**
+ * Current watermark (ms since epoch). Initially set to beginning of time.
+ * Then updated to be the time of the next generated event.
+ * Then, once all events have been generated, set to the end of time.
+ */
+ private long watermark;
+
+ /**
+ * Current backlog (ms), as delay between timestamp of last returned event and the timestamp
+ * we should be up to according to wall-clock time. Used only for logging.
+ */
+ private long backlogDurationMs;
+
+ /**
+ * Current backlog, as estimated number of event bytes we are behind, or null if
+ * unknown. Reported to callers.
+ */
+ @Nullable
+ private Long backlogBytes;
+
+ /**
+ * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported.
+ */
+ private long lastReportedBacklogWallclock;
+
+ /**
+ * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never
+ * calculated.
+ */
+ private long timestampAtLastReportedBacklogMs;
+
+ /** Next event to make 'current' when wallclock time has advanced sufficiently. */
+ @Nullable
+ private TimestampedValue<Event> pendingEvent;
+
+ /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */
+ private long pendingEventWallclockTime;
+
+ /** Current event to return from getCurrent. */
+ @Nullable
+ private TimestampedValue<Event> currentEvent;
+
+ /** Events which have been held back so as to force them to be late. */
+ private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
+
+ public EventReader(Generator generator) {
+ this.generator = generator;
+ watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
+ lastReportedBacklogWallclock = -1;
+ pendingEventWallclockTime = -1;
+ timestampAtLastReportedBacklogMs = -1;
+ }
+
+ public EventReader(GeneratorConfig config) {
+ this(new Generator(config));
+ }
+
+ @Override
+ public boolean start() {
+ LOG.trace("starting unbounded generator {}", generator);
+ return advance();
+ }
+
+
+ @Override
+ public boolean advance() {
+ long now = System.currentTimeMillis();
+
+ while (pendingEvent == null) {
+ if (!generator.hasNext() && heldBackEvents.isEmpty()) {
+ // No more events, EVER.
+ if (isRateLimited) {
+ updateBacklog(System.currentTimeMillis(), 0);
+ }
+ if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ LOG.trace("stopped unbounded generator {}", generator);
+ }
+ return false;
+ }
+
+ Generator.NextEvent next = heldBackEvents.peek();
+ if (next != null && next.wallclockTimestamp <= now) {
+ // Time to use the held-back event.
+ heldBackEvents.poll();
+ LOG.debug("replaying held-back event {}ms behind watermark",
+ watermark - next.eventTimestamp);
+ } else if (generator.hasNext()) {
+ next = generator.nextEvent();
+ if (isRateLimited && config.configuration.probDelayedEvent > 0.0
+ && config.configuration.occasionalDelaySec > 0
+ && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) {
+ // We'll hold back this event and go around again.
+ long delayMs =
+ ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000)
+ + 1L;
+ LOG.debug("delaying event by {}ms", delayMs);
+ heldBackEvents.add(next.withDelay(delayMs));
+ continue;
+ }
+ } else {
+ // Waiting for held-back event to fire.
+ if (isRateLimited) {
+ updateBacklog(now, 0);
+ }
+ return false;
+ }
+
+ pendingEventWallclockTime = next.wallclockTimestamp;
+ pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+ long newWatermark =
+ next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis();
+ if (newWatermark > watermark) {
+ watermark = newWatermark;
+ }
+ }
+
+ if (isRateLimited) {
+ if (pendingEventWallclockTime > now) {
+ // We want this event to fire in the future. Try again later.
+ updateBacklog(now, 0);
+ return false;
+ }
+ updateBacklog(now, now - pendingEventWallclockTime);
+ }
+
+ // This event is ready to fire.
+ currentEvent = pendingEvent;
+ pendingEvent = null;
+ return true;
+ }
+
+ private void updateBacklog(long now, long newBacklogDurationMs) {
+ backlogDurationMs = newBacklogDurationMs;
+ long interEventDelayUs = generator.currentInterEventDelayUs();
+ if (interEventDelayUs != 0) {
+ long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs;
+ backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
+ }
+ if (lastReportedBacklogWallclock < 0
+ || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
+ double timeDialation = Double.NaN;
+ if (pendingEvent != null
+ && lastReportedBacklogWallclock >= 0
+ && timestampAtLastReportedBacklogMs >= 0) {
+ long wallclockProgressionMs = now - lastReportedBacklogWallclock;
+ long eventTimeProgressionMs =
+ pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs;
+ timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
+ }
+ LOG.debug(
+ "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay "
+ + "with {} time dilation",
+ backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation);
+ lastReportedBacklogWallclock = now;
+ if (pendingEvent != null) {
+ timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis();
+ }
+ }
+ }
+
+ @Override
+ public Event getCurrent() {
+ if (currentEvent == null) {
+ throw new NoSuchElementException();
+ }
+ return currentEvent.getValue();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() {
+ if (currentEvent == null) {
+ throw new NoSuchElementException();
+ }
+ return currentEvent.getTimestamp();
+ }
+
+ @Override
+ public void close() {
+ // Nothing to close.
+ }
+
+ @Override
+ public UnboundedEventSource getCurrentSource() {
+ return UnboundedEventSource.this;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return new Instant(watermark);
+ }
+
+ @Override
+ public Generator.Checkpoint getCheckpointMark() {
+ return generator.toCheckpoint();
+ }
+
+ @Override
+ public long getSplitBacklogBytes() {
+ return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("EventReader(%d, %d, %d)",
+ generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(),
+ generator.getCurrentConfig().getStopEventId());
+ }
+ }
+
+ @Override
+ public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
+ return Generator.Checkpoint.CODER_INSTANCE;
+ }
+
+ @Override
+ public List<UnboundedEventSource> split(
+ int desiredNumSplits, PipelineOptions options) {
+ LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators);
+ List<UnboundedEventSource> results = new ArrayList<>();
+ // Ignore desiredNumSplits and use numEventGenerators instead.
+ for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
+ results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited));
+ }
+ return results;
+ }
+
+ @Override
+ public EventReader createReader(
+ PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
+ if (checkpoint == null) {
+ LOG.trace("creating initial unbounded reader for {}", config);
+ return new EventReader(config);
+ } else {
+ LOG.trace("resuming unbounded reader from {}", checkpoint);
+ return new EventReader(checkpoint.toGenerator(config));
+ }
+ }
+
+ @Override
+ public void validate() {
+ // Nothing to validate.
+ }
+
+ @Override
+ public Coder<Event> getDefaultOutputCoder() {
+ return Event.CODER;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
new file mode 100644
index 0000000..266af10
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Synthetic Sources.
+ */
+package org.apache.beam.sdk.nexmark.sources;
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/resources/log4j.properties b/sdks/java/nexmark/src/main/resources/log4j.properties
new file mode 100644
index 0000000..7dd57b5
--- /dev/null
+++ b/sdks/java/nexmark/src/main/resources/log4j.properties
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=DEBUG, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
+
+# General Beam loggers
+log4j.logger.org.apache.beam.runners.direct=WARN
+log4j.logger.org.apache.beam.sdk=WARN
+
+# Nexmark specific
+log4j.logger.org.apache.beam.integration.nexmark=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark_project.jetty=WARN
+log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
+
+# Setting to quiet spark logs, Beam logs should standout
+log4j.logger.org.apache.beam.runners.spark=INFO
+log4j.logger.org.apache.spark=WARN
+log4j.logger.org.spark-project=WARN
+log4j.logger.io.netty=INFO
+
+# Settings to quiet flink logs
+log4j.logger.org.apache.flink=WARN
+
+# Settings to quiet apex logs
+log4j.logger.org.apache.beam.runners.apex=INFO
+log4j.logger.com.datatorrent=ERROR
+log4j.logger.org.apache.hadoop.metrics2=WARN
+log4j.logger.org.apache.commons=WARN
+log4j.logger.org.apache.hadoop.security=WARN
+log4j.logger.org.apache.hadoop.util=WARN
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
new file mode 100644
index 0000000..d8ac057
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test the various NEXMark queries yield results coherent with their models. */
+@RunWith(JUnit4.class)
+public class QueryTest {
+ private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.copy();
+
+ static {
+ // careful, results of tests are linked to numEventGenerators because of timestamp generation
+ CONFIG.numEventGenerators = 1;
+ CONFIG.numEvents = 1000;
+ }
+
+ @Rule public TestPipeline p = TestPipeline.create();
+
+ /** Test {@code query} matches {@code model}. */
+ private void queryMatchesModel(
+ String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) {
+ NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
+ PCollection<TimestampedValue<KnownSize>> results;
+ if (streamingMode) {
+ results =
+ p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
+ } else {
+ results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
+ }
+ PAssert.that(results).satisfies(model.assertionFor());
+ PipelineResult result = p.run();
+ result.waitUntilFinish();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query0MatchesModelBatch() {
+ queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query0MatchesModelStreaming() {
+ queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query1MatchesModelBatch() {
+ queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query1MatchesModelStreaming() {
+ queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query2MatchesModelBatch() {
+ queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query2MatchesModelStreaming() {
+ queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true);
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
+ public void query3MatchesModelBatch() {
+ queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false);
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
+ public void query3MatchesModelStreaming() {
+ queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query4MatchesModelBatch() {
+ queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query4MatchesModelStreaming() {
+ queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query5MatchesModelBatch() {
+ queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query5MatchesModelStreaming() {
+ queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query6MatchesModelBatch() {
+ queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query6MatchesModelStreaming() {
+ queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query7MatchesModelBatch() {
+ queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query7MatchesModelStreaming() {
+ queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query8MatchesModelBatch() {
+ queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query8MatchesModelStreaming() {
+ queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query9MatchesModelBatch() {
+ queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void query9MatchesModelStreaming() {
+ queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
new file mode 100644
index 0000000..3590d64
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test {@link BoundedEventSource}.
+ */
+@RunWith(JUnit4.class)
+public class BoundedEventSourceTest {
+ private GeneratorConfig makeConfig(long n) {
+ return new GeneratorConfig(
+ NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+ }
+
+ @Test
+ public void sourceAndReadersWork() throws Exception {
+ NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
+ long n = 200L;
+ BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
+
+ SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(
+ source.createReader(options), options);
+ }
+
+ @Test
+ public void splitAtFractionRespectsContract() throws Exception {
+ NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
+ long n = 20L;
+ BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
+
+ // Can't split if already consumed.
+ SourceTestUtils.assertSplitAtFractionFails(source, 10, 0.3, options);
+
+ SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 5, 0.3, options);
+
+ SourceTestUtils.assertSplitAtFractionExhaustive(source, options);
+ }
+
+ @Test
+ public void splitIntoBundlesRespectsContract() throws Exception {
+ NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
+ long n = 200L;
+ BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
+ SourceTestUtils.assertSourcesEqualReferenceSource(
+ source, source.split(10, options), options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
new file mode 100644
index 0000000..9553d22
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test {@link Generator}.
+ */
+@RunWith(JUnit4.class)
+public class GeneratorTest {
+ private GeneratorConfig makeConfig(long n) {
+ return new GeneratorConfig(
+ NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+ }
+
+ private <T> long consume(long n, Iterator<T> itr) {
+ for (long i = 0; i < n; i++) {
+ assertTrue(itr.hasNext());
+ itr.next();
+ }
+ return n;
+ }
+
+ private <T> long consume(Iterator<T> itr) {
+ long n = 0;
+ while (itr.hasNext()) {
+ itr.next();
+ n++;
+ }
+ return n;
+ }
+
+ @Test
+ public void splitAtFractionPreservesOverallEventCount() {
+ long n = 55729L;
+ GeneratorConfig initialConfig = makeConfig(n);
+ long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId();
+
+ long actual = 0;
+
+ Generator initialGenerator = new Generator(initialConfig);
+
+ // Consume some events.
+ actual += consume(5000, initialGenerator);
+
+
+ // Split once.
+ GeneratorConfig remainConfig1 = initialGenerator.splitAtEventId(9000L);
+ Generator remainGenerator1 = new Generator(remainConfig1);
+
+ // Consume some more events.
+ actual += consume(2000, initialGenerator);
+ actual += consume(3000, remainGenerator1);
+
+ // Split again.
+ GeneratorConfig remainConfig2 = remainGenerator1.splitAtEventId(30000L);
+ Generator remainGenerator2 = new Generator(remainConfig2);
+
+ // Run to completion.
+ actual += consume(initialGenerator);
+ actual += consume(remainGenerator1);
+ actual += consume(remainGenerator2);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void splitPreservesOverallEventCount() {
+ long n = 51237L;
+ GeneratorConfig initialConfig = makeConfig(n);
+ long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId();
+
+ List<Generator> generators = new ArrayList<>();
+ for (GeneratorConfig subConfig : initialConfig.split(20)) {
+ generators.add(new Generator(subConfig));
+ }
+
+ long actual = 0;
+ for (Generator generator : generators) {
+ actual += consume(generator);
+ }
+
+ assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
new file mode 100644
index 0000000..3853ede
--- /dev/null
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test UnboundedEventSource.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedEventSourceTest {
+ private GeneratorConfig makeConfig(long n) {
+ return new GeneratorConfig(
+ NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+ }
+
+ /**
+ * Helper for tracking which ids we've seen (so we can detect dups) and
+ * confirming reading events match the model events.
+ */
+ private static class EventIdChecker {
+ private final Set<Long> seenPersonIds = new HashSet<>();
+ private final Set<Long> seenAuctionIds = new HashSet<>();
+
+ public void add(Event event) {
+ if (event.newAuction != null) {
+ assertTrue(seenAuctionIds.add(event.newAuction.id));
+ } else if (event.newPerson != null) {
+ assertTrue(seenPersonIds.add(event.newPerson.id));
+ }
+ }
+
+ public void add(int n, UnboundedReader<Event> reader, Generator modelGenerator)
+ throws IOException {
+ for (int i = 0; i < n; i++) {
+ assertTrue(modelGenerator.hasNext());
+ Event modelEvent = modelGenerator.next().getValue();
+ assertTrue(reader.advance());
+ Event actualEvent = reader.getCurrent();
+ assertEquals(modelEvent.toString(), actualEvent.toString());
+ add(actualEvent);
+ }
+ }
+ }
+
+ /**
+ * Check aggressively checkpointing and resuming a reader gives us exactly the
+ * same event stream as reading directly.
+ */
+ @Test
+ public void resumeFromCheckpoint() throws IOException {
+ Random random = new Random(297);
+ int n = 47293;
+ GeneratorConfig config = makeConfig(n);
+ Generator modelGenerator = new Generator(config);
+
+ EventIdChecker checker = new EventIdChecker();
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
+ UnboundedReader<Event> reader = source.createReader(options, null);
+
+ while (n > 0) {
+ int m = Math.min(459 + random.nextInt(455), n);
+ System.out.printf("reading %d...%n", m);
+ checker.add(m, reader, modelGenerator);
+ n -= m;
+ System.out.printf("splitting with %d remaining...%n", n);
+ CheckpointMark checkpointMark = reader.getCheckpointMark();
+ reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
+ }
+
+ assertFalse(reader.advance());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 3144193..f0cf8d9 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -43,6 +43,7 @@
<!-- javadoc runs directly from the root parent as the last module
in the build to be able to capture runner-specific javadoc.
<module>javadoc</module> -->
+ <module>nexmark</module>
</modules>
<profiles>