You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:51:46 UTC
[2/6] beam git commit: [Nexmark] Extract BidGenerator from Generator
[Nexmark] Extract BidGenerator from Generator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8a6fad9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8a6fad9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8a6fad9
Branch: refs/heads/master
Commit: d8a6fad9ed4b65504911fa9d5dadf5c8d4a7a0e6
Parents: e895fc8
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 15:19:39 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/nexmark/NexmarkUtils.java | 4 +-
.../beam/sdk/nexmark/queries/WinningBids.java | 2 +-
.../sdk/nexmark/sources/BoundedEventSource.java | 2 +
.../beam/sdk/nexmark/sources/Generator.java | 316 -----------------
.../nexmark/sources/GeneratorCheckpoint.java | 78 -----
.../sdk/nexmark/sources/GeneratorConfig.java | 339 -------------------
.../nexmark/sources/UnboundedEventSource.java | 3 +
.../nexmark/sources/generator/Generator.java | 271 +++++++++++++++
.../sources/generator/GeneratorCheckpoint.java | 82 +++++
.../sources/generator/GeneratorConfig.java | 339 +++++++++++++++++++
.../generator/model/AuctionGenerator.java | 142 ++++++++
.../sources/generator/model/BidGenerator.java | 76 +++++
.../sources/generator/model/LongGenerator.java | 37 ++
.../generator/model/PersonGenerator.java | 139 ++++++++
.../sources/generator/model/PriceGenerator.java | 32 ++
.../generator/model/StringsGenerator.java | 68 ++++
.../sources/generator/model/package-info.java | 22 ++
.../nexmark/sources/generator/package-info.java | 26 ++
.../nexmark/sources/utils/AuctionGenerator.java | 145 --------
.../nexmark/sources/utils/LongGenerator.java | 37 --
.../nexmark/sources/utils/PersonGenerator.java | 140 --------
.../nexmark/sources/utils/PriceGenerator.java | 32 --
.../nexmark/sources/utils/StringsGenerator.java | 68 ----
.../sdk/nexmark/sources/utils/package-info.java | 22 --
.../nexmark/sources/BoundedEventSourceTest.java | 1 +
.../beam/sdk/nexmark/sources/GeneratorTest.java | 2 +
.../sources/UnboundedEventSourceTest.java | 3 +
27 files changed, 1248 insertions(+), 1180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index fa1ef16..fc0ab9f 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -50,9 +50,9 @@ import org.apache.beam.sdk.nexmark.model.NameCityStateId;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.model.SellerPrice;
import org.apache.beam.sdk.nexmark.sources.BoundedEventSource;
-import org.apache.beam.sdk.nexmark.sources.Generator;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index bc553c9..3ee4f3a 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -44,7 +44,7 @@ import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
index 60124bb..cc32007 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
deleted file mode 100644
index 68e6748..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.lastBase0AuctionId;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextAuction;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextBase0AuctionId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson;
-import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Bid;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
- * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
- * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
- *
- * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
- * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
- * (in microseconds). The event stream is thus fully deterministic and does not depend on
- * wallclock time.
- *
- * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
- * so that we can resume generating events from a saved snapshot.
- */
-public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
-
- /**
- * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
- * over these values.
- */
- private static final int HOT_AUCTION_RATIO = 100;
- private static final int HOT_BIDDER_RATIO = 100;
-
- /**
- * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
- * (arbitrary but stable) event hash order.
- */
- public static class NextEvent implements Comparable<NextEvent> {
- /** When, in wallclock time, should this event be emitted? */
- public final long wallclockTimestamp;
-
- /** When, in event time, should this event be considered to have occured? */
- public final long eventTimestamp;
-
- /** The event itself. */
- public final Event event;
-
- /** The minimum of this and all future event timestamps. */
- public final long watermark;
-
- public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
- this.wallclockTimestamp = wallclockTimestamp;
- this.eventTimestamp = eventTimestamp;
- this.event = event;
- this.watermark = watermark;
- }
-
- /**
- * Return a deep copy of next event with delay added to wallclock timestamp and
- * event annotate as 'LATE'.
- */
- public NextEvent withDelay(long delayMs) {
- return new NextEvent(
- wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
- }
-
- @Override public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- NextEvent nextEvent = (NextEvent) o;
-
- return (wallclockTimestamp == nextEvent.wallclockTimestamp
- && eventTimestamp == nextEvent.eventTimestamp
- && watermark == nextEvent.watermark
- && event.equals(nextEvent.event));
- }
-
- @Override public int hashCode() {
- return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
- }
-
- @Override
- public int compareTo(NextEvent other) {
- int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
- if (i != 0) {
- return i;
- }
- return Integer.compare(event.hashCode(), other.event.hashCode());
- }
- }
-
- /**
- * Configuration to generate events against. Note that it may be replaced by a call to
- * {@link #splitAtEventId}.
- */
- private GeneratorConfig config;
-
- /** Number of events generated by this generator. */
- private long eventsCountSoFar;
-
- /**
- * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
- */
- private long wallclockBaseTime;
-
- Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
- checkNotNull(config);
- this.config = config;
- this.eventsCountSoFar = eventsCountSoFar;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- /**
- * Create a fresh generator according to {@code config}.
- */
- public Generator(GeneratorConfig config) {
- this(config, 0, -1);
- }
-
- /**
- * Return a checkpoint for the current generator.
- */
- public GeneratorCheckpoint toCheckpoint() {
- return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
- }
-
- /**
- * Return a deep copy of this generator.
- */
- public Generator copy() {
- checkNotNull(config);
- Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
- return result;
- }
-
- /**
- * Return the current config for this generator. Note that configs may be replaced by {@link
- * #splitAtEventId}.
- */
- public GeneratorConfig getCurrentConfig() {
- return config;
- }
-
- /**
- * Mutate this generator so that it will only generate events up to but not including
- * {@code eventId}. Return a config to represent the events this generator will no longer yield.
- * The generators will run in on a serial timeline.
- */
- public GeneratorConfig splitAtEventId(long eventId) {
- long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
- GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
- config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
- config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
- return remainConfig;
- }
-
- /**
- * Return the next 'event id'. Though events don't have ids we can simulate them to
- * help with bookkeeping.
- */
- public long getNextEventId() {
- return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
- }
-
-
-
- /**
- * Generate and return a random bid with next available id.
- */
- private Bid nextBid(long eventId, Random random, long timestamp) {
- long auction;
- // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
- if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
- // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
- auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
- } else {
- auction = nextBase0AuctionId(eventId, random, config);
- }
- auction += GeneratorConfig.FIRST_AUCTION_ID;
-
- long bidder;
- // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
- if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
- // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
- // last HOT_BIDDER_RATIO people.
- bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
- } else {
- bidder = nextBase0PersonId(eventId, random, config);
- }
- bidder += GeneratorConfig.FIRST_PERSON_ID;
-
- long price = nextPrice(random);
- int currentSize = 8 + 8 + 8 + 8;
- String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
- return new Bid(auction, bidder, price, timestamp, extra);
- }
-
- @Override
- public boolean hasNext() {
- return eventsCountSoFar < config.maxEvents;
- }
-
- /**
- * Return the next event. The outer timestamp is in wallclock time and corresponds to
- * when the event should fire. The inner timestamp is in event-time and represents the
- * time the event is purported to have taken place in the simulation.
- */
- public NextEvent nextEvent() {
- if (wallclockBaseTime < 0) {
- wallclockBaseTime = System.currentTimeMillis();
- }
- // When, in event time, we should generate the event. Monotonic.
- long eventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(
- config.nextEventNumber(eventsCountSoFar)).getKey();
- // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
- // may have local jitter.
- long adjustedEventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(
- config.nextAdjustedEventNumber(eventsCountSoFar))
- .getKey();
- // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
- // the event timestamp.
- long watermark =
- config.timestampAndInterEventDelayUsForEvent(
- config.nextEventNumberForWatermark(eventsCountSoFar))
- .getKey();
- // When, in wallclock time, we should emit the event.
- long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
-
- // Seed the random number generator with the next 'event id'.
- Random random = new Random(getNextEventId());
-
-
- long newEventId = getNextEventId();
- long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-
- Event event;
- if (rem < GeneratorConfig.PERSON_PROPORTION) {
- event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
- } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- event = new Event(
- nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
- } else {
- event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
- }
-
- eventsCountSoFar++;
- return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
- }
-
- @Override
- public TimestampedValue<Event> next() {
- NextEvent next = nextEvent();
- return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Return how many microseconds till we emit the next event.
- */
- public long currentInterEventDelayUs() {
- return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
- .getValue();
- }
-
- /**
- * Return an estimate of fraction of output consumed.
- */
- public double getFractionConsumed() {
- return (double) eventsCountSoFar / config.maxEvents;
- }
-
- @Override
- public String toString() {
- return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
- eventsCountSoFar, wallclockBaseTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
deleted file mode 100644
index dfc135d..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-
-/**
- * Just enough state to be able to restore a generator back to where it was checkpointed.
- */
-public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- /** Coder for this class. */
- public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
- new CustomCoder<GeneratorCheckpoint>() {
- @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.numEvents, outStream);
- LONG_CODER.encode(value.wallclockBaseTime, outStream);
- }
-
- @Override
- public GeneratorCheckpoint decode(InputStream inStream)
- throws CoderException, IOException {
- long numEvents = LONG_CODER.decode(inStream);
- long wallclockBaseTime = LONG_CODER.decode(inStream);
- return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- private final long numEvents;
- private final long wallclockBaseTime;
-
- GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
- this.numEvents = numEvents;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- public Generator toGenerator(GeneratorConfig config) {
- return new Generator(config, numEvents, wallclockBaseTime);
- }
-
- @Override
- public void finalizeCheckpoint() throws IOException {
- // Nothing to finalize.
- }
-
- @Override
- public String toString() {
- return String.format("Generator.GeneratorCheckpoint{numEvents:%d;wallclockBaseTime:%d}",
- numEvents, wallclockBaseTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
deleted file mode 100644
index 8e0a899..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
- */
-public class GeneratorConfig implements Serializable {
-
- /**
- * We start the ids at specific values to help ensure the queries find a match even on
- * small synthesized dataset sizes.
- */
- public static final long FIRST_AUCTION_ID = 1000L;
- public static final long FIRST_PERSON_ID = 1000L;
- public static final long FIRST_CATEGORY_ID = 10L;
-
- /**
- * Proportions of people/auctions/bids to synthesize.
- */
- public static final int PERSON_PROPORTION = 1;
- public static final int AUCTION_PROPORTION = 3;
- private static final int BID_PROPORTION = 46;
- public static final int PROPORTION_DENOMINATOR =
- PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
-
- /**
- * Environment options.
- */
- public final NexmarkConfiguration configuration;
-
- /**
- * Delay between events, in microseconds. If the array has more than one entry then
- * the rate is changed every {@link #stepLengthSec}, and wraps around.
- */
- private final long[] interEventDelayUs;
-
- /**
- * Delay before changing the current inter-event delay.
- */
- private final long stepLengthSec;
-
- /**
- * Time for first event (ms since epoch).
- */
- public final long baseTime;
-
- /**
- * Event id of first event to be generated. Event ids are unique over all generators, and
- * are used as a seed to generate each event's data.
- */
- public final long firstEventId;
-
- /**
- * Maximum number of events to generate.
- */
- public final long maxEvents;
-
- /**
- * First event number. Generators running in parallel time may share the same event number,
- * and the event number is used to determine the event timestamp.
- */
- public final long firstEventNumber;
-
- /**
- * True period of epoch in milliseconds. Derived from above.
- * (Ie time to run through cycle for all interEventDelayUs entries).
- */
- private final long epochPeriodMs;
-
- /**
- * Number of events per epoch. Derived from above.
- * (Ie number of events to run through cycle for all interEventDelayUs entries).
- */
- private final long eventsPerEpoch;
-
- public GeneratorConfig(
- NexmarkConfiguration configuration, long baseTime, long firstEventId,
- long maxEventsOrZero, long firstEventNumber) {
- this.configuration = configuration;
- this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
- configuration.firstEventRate, configuration.nextEventRate,
- configuration.rateUnit, configuration.numEventGenerators);
- this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
- this.baseTime = baseTime;
- this.firstEventId = firstEventId;
- if (maxEventsOrZero == 0) {
- // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
- this.maxEvents =
- Long.MAX_VALUE / (PROPORTION_DENOMINATOR
- * Math.max(
- Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
- configuration.avgBidByteSize));
- } else {
- this.maxEvents = maxEventsOrZero;
- }
- this.firstEventNumber = firstEventNumber;
-
- long eventsPerEpoch = 0;
- long epochPeriodMs = 0;
- if (interEventDelayUs.length > 1) {
- for (long interEventDelayU : interEventDelayUs) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
- eventsPerEpoch += numEventsForThisCycle;
- epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
- }
- }
- this.eventsPerEpoch = eventsPerEpoch;
- this.epochPeriodMs = epochPeriodMs;
- }
-
- /**
- * Return a copy of this config.
- */
- public GeneratorConfig copy() {
- GeneratorConfig result;
- result = new GeneratorConfig(configuration, baseTime, firstEventId,
- maxEvents, firstEventNumber);
- return result;
- }
-
- /**
- * Split this config into {@code n} sub-configs with roughly equal number of
- * possible events, but distinct value spaces. The generators will run on parallel timelines.
- * This config should no longer be used.
- */
- public List<GeneratorConfig> split(int n) {
- List<GeneratorConfig> results = new ArrayList<>();
- if (n == 1) {
- // No split required.
- results.add(this);
- } else {
- long subMaxEvents = maxEvents / n;
- long subFirstEventId = firstEventId;
- for (int i = 0; i < n; i++) {
- if (i == n - 1) {
- // Don't loose any events to round-down.
- subMaxEvents = maxEvents - subMaxEvents * (n - 1);
- }
- results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
- subFirstEventId += subMaxEvents;
- }
- }
- return results;
- }
-
- /**
- * Return copy of this config except with given parameters.
- */
- public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
- return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
- }
-
- /**
- * Return an estimate of the bytes needed by {@code numEvents}.
- */
- public long estimatedBytesForEvents(long numEvents) {
- long numPersons =
- (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
- long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
- long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
- return numPersons * configuration.avgPersonByteSize
- + numAuctions * configuration.avgAuctionByteSize
- + numBids * configuration.avgBidByteSize;
- }
-
- public int getAvgPersonByteSize() {
- return configuration.avgPersonByteSize;
- }
-
- public int getNumActivePeople() {
- return configuration.numActivePeople;
- }
-
- public int getHotSellersRatio() {
- return configuration.hotSellersRatio;
- }
-
- public int getNumInFlightAuctions() {
- return configuration.numInFlightAuctions;
- }
-
- public int getHotAuctionRatio() {
- return configuration.hotAuctionRatio;
- }
-
- public int getHotBiddersRatio() {
- return configuration.hotBiddersRatio;
- }
-
- public int getAvgBidByteSize() {
- return configuration.avgBidByteSize;
- }
-
- public int getAvgAuctionByteSize() {
- return configuration.avgAuctionByteSize;
- }
-
- public double getProbDelayedEvent() {
- return configuration.probDelayedEvent;
- }
-
- public long getOccasionalDelaySec() {
- return configuration.occasionalDelaySec;
- }
-
- /**
- * Return an estimate of the byte-size of all events a generator for this config would yield.
- */
- public long getEstimatedSizeBytes() {
- return estimatedBytesForEvents(maxEvents);
- }
-
- /**
- * Return the first 'event id' which could be generated from this config. Though events don't
- * have ids we can simulate them to help bookkeeping.
- */
- public long getStartEventId() {
- return firstEventId + firstEventNumber;
- }
-
- /**
- * Return one past the last 'event id' which could be generated from this config.
- */
- public long getStopEventId() {
- return firstEventId + firstEventNumber + maxEvents;
- }
-
- /**
- * Return the next event number for a generator which has so far emitted {@code numEvents}.
- */
- public long nextEventNumber(long numEvents) {
- return firstEventNumber + numEvents;
- }
-
- /**
- * Return the next event number for a generator which has so far emitted {@code numEvents},
- * but adjusted to account for {@code outOfOrderGroupSize}.
- */
- public long nextAdjustedEventNumber(long numEvents) {
- long n = configuration.outOfOrderGroupSize;
- long eventNumber = nextEventNumber(numEvents);
- long base = (eventNumber / n) * n;
- long offset = (eventNumber * 953) % n;
- return base + offset;
- }
-
- /**
- * Return the event number who's event time will be a suitable watermark for
- * a generator which has so far emitted {@code numEvents}.
- */
- public long nextEventNumberForWatermark(long numEvents) {
- long n = configuration.outOfOrderGroupSize;
- long eventNumber = nextEventNumber(numEvents);
- return (eventNumber / n) * n;
- }
-
- /**
- * What timestamp should the event with {@code eventNumber} have for this generator? And
- * what inter-event delay (in microseconds) is current?
- */
- public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
- if (interEventDelayUs.length == 1) {
- long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
- return KV.of(timestamp, interEventDelayUs[0]);
- }
-
- long epoch = eventNumber / eventsPerEpoch;
- long n = eventNumber % eventsPerEpoch;
- long offsetInEpochMs = 0;
- for (long interEventDelayU : interEventDelayUs) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
- if (n < numEventsForThisCycle) {
- long offsetInCycleUs = n * interEventDelayU;
- long timestamp =
- baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
- return KV.of(timestamp, interEventDelayU);
- }
- n -= numEventsForThisCycle;
- offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
- }
- throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("GeneratorConfig");
- sb.append("{configuration:");
- sb.append(configuration.toString());
- sb.append(";interEventDelayUs=[");
- for (int i = 0; i < interEventDelayUs.length; i++) {
- if (i > 0) {
- sb.append(",");
- }
- sb.append(interEventDelayUs[i]);
- }
- sb.append("]");
- sb.append(";stepLengthSec:");
- sb.append(stepLengthSec);
- sb.append(";baseTime:");
- sb.append(baseTime);
- sb.append(";firstEventId:");
- sb.append(firstEventId);
- sb.append(";maxEvents:");
- sb.append(maxEvents);
- sb.append(";firstEventNumber:");
- sb.append(firstEventNumber);
- sb.append(";epochPeriodMs:");
- sb.append(epochPeriodMs);
- sb.append(";eventsPerEpoch:");
- sb.append(eventsPerEpoch);
- sb.append("}");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index 74eb061..f43486d 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -30,6 +30,9 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
new file mode 100644
index 0000000..bd736c1
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.nextAuction;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.BidGenerator.nextBid;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextPerson;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
+ * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
+ * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
+ *
+ * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
+ * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
+ * (in microseconds). The event stream is thus fully deterministic and does not depend on
+ * wallclock time.
+ *
+ * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
+ * so that we can resume generating events from a saved snapshot.
+ */
+public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
+
+
+ /**
+ * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
+ * (arbitrary but stable) event hash order.
+ */
+ public static class NextEvent implements Comparable<NextEvent> {
+ /** When, in wallclock time, should this event be emitted? */
+ public final long wallclockTimestamp;
+
+ /** When, in event time, should this event be considered to have occured? */
+ public final long eventTimestamp;
+
+ /** The event itself. */
+ public final Event event;
+
+ /** The minimum of this and all future event timestamps. */
+ public final long watermark;
+
+ public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
+ this.wallclockTimestamp = wallclockTimestamp;
+ this.eventTimestamp = eventTimestamp;
+ this.event = event;
+ this.watermark = watermark;
+ }
+
+ /**
+ * Return a deep copy of next event with delay added to wallclock timestamp and
+ * event annotate as 'LATE'.
+ */
+ public NextEvent withDelay(long delayMs) {
+ return new NextEvent(
+ wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NextEvent nextEvent = (NextEvent) o;
+
+ return (wallclockTimestamp == nextEvent.wallclockTimestamp
+ && eventTimestamp == nextEvent.eventTimestamp
+ && watermark == nextEvent.watermark
+ && event.equals(nextEvent.event));
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
+ }
+
+ @Override
+ public int compareTo(NextEvent other) {
+ int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
+ if (i != 0) {
+ return i;
+ }
+ return Integer.compare(event.hashCode(), other.event.hashCode());
+ }
+ }
+
+ /**
+ * Configuration to generate events against. Note that it may be replaced by a call to
+ * {@link #splitAtEventId}.
+ */
+ private GeneratorConfig config;
+
+ /** Number of events generated by this generator. */
+ private long eventsCountSoFar;
+
+ /**
+ * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
+ */
+ private long wallclockBaseTime;
+
+ Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
+ checkNotNull(config);
+ this.config = config;
+ this.eventsCountSoFar = eventsCountSoFar;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ /**
+ * Create a fresh generator according to {@code config}.
+ */
+ public Generator(GeneratorConfig config) {
+ this(config, 0, -1);
+ }
+
+ /**
+ * Return a checkpoint for the current generator.
+ */
+ public GeneratorCheckpoint toCheckpoint() {
+ return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
+ }
+
+ /**
+ * Return a deep copy of this generator.
+ */
+ public Generator copy() {
+ checkNotNull(config);
+ Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
+ return result;
+ }
+
+ /**
+ * Return the current config for this generator. Note that configs may be replaced by {@link
+ * #splitAtEventId}.
+ */
+ public GeneratorConfig getCurrentConfig() {
+ return config;
+ }
+
+ /**
+ * Mutate this generator so that it will only generate events up to but not including
+ * {@code eventId}. Return a config to represent the events this generator will no longer yield.
+ * The generators will run in on a serial timeline.
+ */
+ public GeneratorConfig splitAtEventId(long eventId) {
+ long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
+ GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
+ config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
+ config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+ return remainConfig;
+ }
+
+ /**
+ * Return the next 'event id'. Though events don't have ids we can simulate them to
+ * help with bookkeeping.
+ */
+ public long getNextEventId() {
+ return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return eventsCountSoFar < config.maxEvents;
+ }
+
+ /**
+ * Return the next event. The outer timestamp is in wallclock time and corresponds to
+ * when the event should fire. The inner timestamp is in event-time and represents the
+ * time the event is purported to have taken place in the simulation.
+ */
+ public NextEvent nextEvent() {
+ if (wallclockBaseTime < 0) {
+ wallclockBaseTime = System.currentTimeMillis();
+ }
+ // When, in event time, we should generate the event. Monotonic.
+ long eventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextEventNumber(eventsCountSoFar)).getKey();
+ // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
+ // may have local jitter.
+ long adjustedEventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextAdjustedEventNumber(eventsCountSoFar))
+ .getKey();
+ // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
+ // the event timestamp.
+ long watermark =
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextEventNumberForWatermark(eventsCountSoFar))
+ .getKey();
+ // When, in wallclock time, we should emit the event.
+ long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
+
+ // Seed the random number generator with the next 'event id'.
+ Random random = new Random(getNextEventId());
+
+
+ long newEventId = getNextEventId();
+ long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+ Event event;
+ if (rem < GeneratorConfig.PERSON_PROPORTION) {
+ event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
+ } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ event = new Event(
+ nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
+ } else {
+ event = new Event(nextBid(newEventId, random, adjustedEventTimestamp, config));
+ }
+
+ eventsCountSoFar++;
+ return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
+ }
+
+ @Override
+ public TimestampedValue<Event> next() {
+ NextEvent next = nextEvent();
+ return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Return how many microseconds till we emit the next event.
+ */
+ public long currentInterEventDelayUs() {
+ return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
+ .getValue();
+ }
+
+ /**
+ * Return an estimate of fraction of output consumed.
+ */
+ public double getFractionConsumed() {
+ return (double) eventsCountSoFar / config.maxEvents;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
+ eventsCountSoFar, wallclockBaseTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
new file mode 100644
index 0000000..fa41739
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ /** Coder for this class. */
+ public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
+ new CustomCoder<GeneratorCheckpoint>() {
+ @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.numEvents, outStream);
+ LONG_CODER.encode(value.wallclockBaseTime, outStream);
+ }
+
+ @Override
+ public GeneratorCheckpoint decode(InputStream inStream)
+ throws CoderException, IOException {
+ long numEvents = LONG_CODER.decode(inStream);
+ long wallclockBaseTime = LONG_CODER.decode(inStream);
+ return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
+ }
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
+ };
+
+ private final long numEvents;
+ private final long wallclockBaseTime;
+
+ GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ public Generator toGenerator(GeneratorConfig config) {
+ return new Generator(config, numEvents, wallclockBaseTime);
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ // Nothing to finalize.
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("numEvents", numEvents)
+ .add("wallclockBaseTime", wallclockBaseTime)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
new file mode 100644
index 0000000..7c862fa
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
+ */
+public class GeneratorConfig implements Serializable {
+
+ /**
+ * We start the ids at specific values to help ensure the queries find a match even on
+ * small synthesized dataset sizes.
+ */
+ public static final long FIRST_AUCTION_ID = 1000L;
+ public static final long FIRST_PERSON_ID = 1000L;
+ public static final long FIRST_CATEGORY_ID = 10L;
+
+ /**
+ * Proportions of people/auctions/bids to synthesize.
+ */
+ public static final int PERSON_PROPORTION = 1;
+ public static final int AUCTION_PROPORTION = 3;
+ private static final int BID_PROPORTION = 46;
+ public static final int PROPORTION_DENOMINATOR =
+ PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
+
+ /**
+ * Environment options.
+ */
+ private final NexmarkConfiguration configuration;
+
+ /**
+ * Delay between events, in microseconds. If the array has more than one entry then
+ * the rate is changed every {@link #stepLengthSec}, and wraps around.
+ */
+ private final long[] interEventDelayUs;
+
+ /**
+ * Delay before changing the current inter-event delay.
+ */
+ private final long stepLengthSec;
+
+ /**
+ * Time for first event (ms since epoch).
+ */
+ public final long baseTime;
+
+ /**
+ * Event id of first event to be generated. Event ids are unique over all generators, and
+ * are used as a seed to generate each event's data.
+ */
+ public final long firstEventId;
+
+ /**
+ * Maximum number of events to generate.
+ */
+ public final long maxEvents;
+
+ /**
+ * First event number. Generators running in parallel time may share the same event number,
+ * and the event number is used to determine the event timestamp.
+ */
+ public final long firstEventNumber;
+
+ /**
+ * True period of epoch in milliseconds. Derived from above.
+ * (Ie time to run through cycle for all interEventDelayUs entries).
+ */
+ private final long epochPeriodMs;
+
+ /**
+ * Number of events per epoch. Derived from above.
+ * (Ie number of events to run through cycle for all interEventDelayUs entries).
+ */
+ private final long eventsPerEpoch;
+
+ public GeneratorConfig(
+ NexmarkConfiguration configuration, long baseTime, long firstEventId,
+ long maxEventsOrZero, long firstEventNumber) {
+ this.configuration = configuration;
+ this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
+ configuration.firstEventRate, configuration.nextEventRate,
+ configuration.rateUnit, configuration.numEventGenerators);
+ this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
+ this.baseTime = baseTime;
+ this.firstEventId = firstEventId;
+ if (maxEventsOrZero == 0) {
+ // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
+ this.maxEvents =
+ Long.MAX_VALUE / (PROPORTION_DENOMINATOR
+ * Math.max(
+ Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
+ configuration.avgBidByteSize));
+ } else {
+ this.maxEvents = maxEventsOrZero;
+ }
+ this.firstEventNumber = firstEventNumber;
+
+ long eventsPerEpoch = 0;
+ long epochPeriodMs = 0;
+ if (interEventDelayUs.length > 1) {
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+ eventsPerEpoch += numEventsForThisCycle;
+ epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+ }
+ }
+ this.eventsPerEpoch = eventsPerEpoch;
+ this.epochPeriodMs = epochPeriodMs;
+ }
+
+ /**
+ * Return a copy of this config.
+ */
+ public GeneratorConfig copy() {
+ GeneratorConfig result;
+ result = new GeneratorConfig(configuration, baseTime, firstEventId,
+ maxEvents, firstEventNumber);
+ return result;
+ }
+
+ /**
+ * Split this config into {@code n} sub-configs with roughly equal number of
+ * possible events, but distinct value spaces. The generators will run on parallel timelines.
+ * This config should no longer be used.
+ */
+ public List<GeneratorConfig> split(int n) {
+ List<GeneratorConfig> results = new ArrayList<>();
+ if (n == 1) {
+ // No split required.
+ results.add(this);
+ } else {
+ long subMaxEvents = maxEvents / n;
+ long subFirstEventId = firstEventId;
+ for (int i = 0; i < n; i++) {
+ if (i == n - 1) {
+ // Don't loose any events to round-down.
+ subMaxEvents = maxEvents - subMaxEvents * (n - 1);
+ }
+ results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
+ subFirstEventId += subMaxEvents;
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Return copy of this config except with given parameters.
+ */
+ public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
+ return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ }
+
+ /**
+ * Return an estimate of the bytes needed by {@code numEvents}.
+ */
+ public long estimatedBytesForEvents(long numEvents) {
+ long numPersons =
+ (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
+ long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
+ return numPersons * configuration.avgPersonByteSize
+ + numAuctions * configuration.avgAuctionByteSize
+ + numBids * configuration.avgBidByteSize;
+ }
+
+ public int getAvgPersonByteSize() {
+ return configuration.avgPersonByteSize;
+ }
+
+ public int getNumActivePeople() {
+ return configuration.numActivePeople;
+ }
+
+ public int getHotSellersRatio() {
+ return configuration.hotSellersRatio;
+ }
+
+ public int getNumInFlightAuctions() {
+ return configuration.numInFlightAuctions;
+ }
+
+ public int getHotAuctionRatio() {
+ return configuration.hotAuctionRatio;
+ }
+
+ public int getHotBiddersRatio() {
+ return configuration.hotBiddersRatio;
+ }
+
+ public int getAvgBidByteSize() {
+ return configuration.avgBidByteSize;
+ }
+
+ public int getAvgAuctionByteSize() {
+ return configuration.avgAuctionByteSize;
+ }
+
+ public double getProbDelayedEvent() {
+ return configuration.probDelayedEvent;
+ }
+
+ public long getOccasionalDelaySec() {
+ return configuration.occasionalDelaySec;
+ }
+
+ /**
+ * Return an estimate of the byte-size of all events a generator for this config would yield.
+ */
+ public long getEstimatedSizeBytes() {
+ return estimatedBytesForEvents(maxEvents);
+ }
+
+ /**
+ * Return the first 'event id' which could be generated from this config. Though events don't
+ * have ids we can simulate them to help bookkeeping.
+ */
+ public long getStartEventId() {
+ return firstEventId + firstEventNumber;
+ }
+
+ /**
+ * Return one past the last 'event id' which could be generated from this config.
+ */
+ public long getStopEventId() {
+ return firstEventId + firstEventNumber + maxEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumber(long numEvents) {
+ return firstEventNumber + numEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents},
+ * but adjusted to account for {@code outOfOrderGroupSize}.
+ */
+ public long nextAdjustedEventNumber(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ long base = (eventNumber / n) * n;
+ long offset = (eventNumber * 953) % n;
+ return base + offset;
+ }
+
+ /**
+ * Return the event number who's event time will be a suitable watermark for
+ * a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumberForWatermark(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ return (eventNumber / n) * n;
+ }
+
+ /**
+ * What timestamp should the event with {@code eventNumber} have for this generator? And
+ * what inter-event delay (in microseconds) is current?
+ */
+ public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
+ if (interEventDelayUs.length == 1) {
+ long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
+ return KV.of(timestamp, interEventDelayUs[0]);
+ }
+
+ long epoch = eventNumber / eventsPerEpoch;
+ long n = eventNumber % eventsPerEpoch;
+ long offsetInEpochMs = 0;
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+ if (n < numEventsForThisCycle) {
+ long offsetInCycleUs = n * interEventDelayU;
+ long timestamp =
+ baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
+ return KV.of(timestamp, interEventDelayU);
+ }
+ n -= numEventsForThisCycle;
+ offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+ }
+ throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("GeneratorConfig");
+ sb.append("{configuration:");
+ sb.append(configuration.toString());
+ sb.append(";interEventDelayUs=[");
+ for (int i = 0; i < interEventDelayUs.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(interEventDelayUs[i]);
+ }
+ sb.append("]");
+ sb.append(";stepLengthSec:");
+ sb.append(stepLengthSec);
+ sb.append(";baseTime:");
+ sb.append(baseTime);
+ sb.append(";firstEventId:");
+ sb.append(firstEventId);
+ sb.append(";maxEvents:");
+ sb.append(maxEvents);
+ sb.append(";firstEventNumber:");
+ sb.append(firstEventNumber);
+ sb.append(";epochPeriodMs:");
+ sb.append(epochPeriodMs);
+ sb.append(";eventsPerEpoch:");
+ sb.append(eventsPerEpoch);
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
new file mode 100644
index 0000000..41a81da
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PriceGenerator.nextPrice;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextString;
+
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * AuctionGenerator.
+ */
+public class AuctionGenerator {
+ /**
+ * Keep the number of categories small so the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final int NUM_CATEGORIES = 5;
+
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int AUCTION_ID_LEAD = 10;
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_SELLER_RATIO = 100;
+
+ /**
+ * Generate and return a random auction with next available id.
+ */
+ public static Auction nextAuction(
+ long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
+
+ long seller;
+ // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+ if (random.nextInt(config.getHotSellersRatio()) > 0) {
+ // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+ seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+ } else {
+ seller = nextBase0PersonId(eventId, random, config);
+ }
+ seller += GeneratorConfig.FIRST_PERSON_ID;
+
+ long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+ long initialBid = nextPrice(random);
+ long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config);
+ String name = nextString(random, 20);
+ String desc = nextString(random, 100);
+ long reserve = initialBid + nextPrice(random);
+ int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.getAvgAuctionByteSize());
+ return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
+ extra);
+ }
+
+ /**
+ * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+ * due to generate an auction.
+ */
+ public static long lastBase0AuctionId(long eventId) {
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset < GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate a person.
+ // Go back to the last auction in the last epoch.
+ epoch--;
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ // About to generate a bid.
+ // Go back to the last auction generated in this epoch.
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else {
+ // About to generate an auction.
+ offset -= GeneratorConfig.PERSON_PROPORTION;
+ }
+ return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+ }
+
+ /**
+ * Return a random auction id (base 0).
+ */
+ public static long nextBase0AuctionId(
+ long nextEventId, Random random, GeneratorConfig config) {
+
+ // Choose a random auction for any of those which are likely to still be in flight,
+ // plus a few 'leads'.
+ // Note that ideally we'd track non-expired auctions exactly, but that state
+ // is difficult to split.
+ long minAuction = Math.max(
+ lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
+ long maxAuction = lastBase0AuctionId(nextEventId);
+ return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+ }
+
+ /** Return a random time delay, in milliseconds, for length of auctions. */
+ private static long nextAuctionLengthMs(
+ long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) {
+
+ // What's our current event number?
+ long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
+ // How many events till we've generated numInFlightAuctions?
+ long numEventsForAuctions =
+ (config.getNumInFlightAuctions() * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // When will the auction numInFlightAuctions beyond now be generated?
+ long futureAuction = config
+ .timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+ .getKey();
+ // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+ // futureAuction - timestamp, numEventsForAuctions);
+ // Choose a length with average horizonMs.
+ long horizonMs = futureAuction - timestamp;
+ return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
new file mode 100644
index 0000000..cffe380
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.lastBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.nextBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * Generates bids.
+ */
+public class BidGenerator {
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_AUCTION_RATIO = 100;
+ private static final int HOT_BIDDER_RATIO = 100;
+
+
+ /**
+ * Generate and return a random bid with next available id.
+ */
+ public static Bid nextBid(
+ long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long auction;
+ // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
+ if (random.nextInt(config.getHotAuctionRatio()) > 0) {
+ // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
+ auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+ } else {
+ auction = nextBase0AuctionId(eventId, random, config);
+ }
+ auction += GeneratorConfig.FIRST_AUCTION_ID;
+
+ long bidder;
+ // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
+ if (random.nextInt(config.getHotBiddersRatio()) > 0) {
+ // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
+ // last HOT_BIDDER_RATIO people.
+ bidder = (lastBase0PersonId(eventId) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+ } else {
+ bidder = nextBase0PersonId(eventId, random, config);
+ }
+ bidder += GeneratorConfig.FIRST_PERSON_ID;
+
+ long price = PriceGenerator.nextPrice(random);
+ int currentSize = 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.getAvgBidByteSize());
+ return new Bid(auction, bidder, price, timestamp, extra);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
new file mode 100644
index 0000000..ed9db84
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * LongGenerator.
+ */
+public class LongGenerator {
+
+ /** Return a random long from {@code [0, n)}. */
+ public static long nextLong(Random random, long n) {
+ if (n < Integer.MAX_VALUE) {
+ return random.nextInt((int) n);
+ } else {
+ // WARNING: Very skewed distribution! Bad!
+ return Math.abs(random.nextLong() % n);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
new file mode 100644
index 0000000..9f306ea
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextString;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * Generates people.
+ */
+public class PersonGenerator {
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int PERSON_ID_LEAD = 10;
+
+ /**
+ * Keep the number of states small so that the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+ private static final List<String> US_CITIES =
+ Arrays.asList(
+ ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+ .split(","));
+
+ private static final List<String> FIRST_NAMES =
+ Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+ private static final List<String> LAST_NAMES =
+ Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+
+ /**
+ * Generate and return a random person with next available id.
+ */
+ public static Person nextPerson(
+ long nextEventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
+ String name = nextPersonName(random);
+ String email = nextEmail(random);
+ String creditCard = nextCreditCard(random);
+ String city = nextUSCity(random);
+ String state = nextUSState(random);
+ int currentSize =
+ 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+ String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize());
+ return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+ }
+
+ /**
+ * Return a random person id (base 0).
+ */
+ public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) {
+ // Choose a random person from any of the 'active' people, plus a few 'leads'.
+ // By limiting to 'active' we ensure the density of bids or auctions per person
+ // does not decrease over time for long running jobs.
+ // By choosing a person id ahead of the last valid person id we will make
+ // newPerson and newAuction events appear to have been swapped in time.
+ long numPeople = lastBase0PersonId(eventId) + 1;
+ long activePeople = Math.min(numPeople, config.getNumActivePeople());
+ long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+ return numPeople - activePeople + n;
+ }
+
+ /**
+ * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+ * due to generate a person.
+ */
+ public static long lastBase0PersonId(long eventId) {
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate an auction or bid.
+ // Go back to the last person generated in this epoch.
+ offset = GeneratorConfig.PERSON_PROPORTION - 1;
+ }
+ // About to generate a person.
+ return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+ }
+
+
+ /** return a random US state. */
+ private static String nextUSState(Random random) {
+ return US_STATES.get(random.nextInt(US_STATES.size()));
+ }
+
+ /** Return a random US city. */
+ private static String nextUSCity(Random random) {
+ return US_CITIES.get(random.nextInt(US_CITIES.size()));
+ }
+
+ /** Return a random person name. */
+ private static String nextPersonName(Random random) {
+ return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+ + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+ }
+
+ /** Return a random email address. */
+ private static String nextEmail(Random random) {
+ return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+ }
+
+ /** Return a random credit card number. */
+ private static String nextCreditCard(Random random) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ if (i > 0) {
+ sb.append(' ');
+ }
+ sb.append(String.format("%04d", random.nextInt(10000)));
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
new file mode 100644
index 0000000..912b16e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * Generates a random price.
+ */
+public class PriceGenerator {
+
+ /** Return a random price. */
+ public static long nextPrice(Random random) {
+ return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
new file mode 100644
index 0000000..c808560
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * Generates strings which are used for different field in other model objects.
+ */
+public class StringsGenerator {
+
+ /** Smallest random string size. */
+ private static final int MIN_STRING_LENGTH = 3;
+
+ /** Return a random string of up to {@code maxLength}. */
+ public static String nextString(Random random, int maxLength) {
+ int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+ StringBuilder sb = new StringBuilder();
+ while (len-- > 0) {
+ if (random.nextInt(13) == 0) {
+ sb.append(' ');
+ } else {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ /** Return a random string of exactly {@code length}. */
+ public static String nextExactString(Random random, int length) {
+ StringBuilder sb = new StringBuilder();
+ while (length-- > 0) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+ * {@code averageSize}.
+ */
+ public static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+ if (currentSize > desiredAverageSize) {
+ return "";
+ }
+ desiredAverageSize -= currentSize;
+ int delta = (int) Math.round(desiredAverageSize * 0.2);
+ int minSize = desiredAverageSize - delta;
+ int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+ return nextExactString(random, desiredSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
new file mode 100644
index 0000000..c15b5ed
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Model Generators.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator.model;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
new file mode 100644
index 0000000..a7ffd25
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * org.apache.beam.sdk.nexmark.sources.generator.
+ */
+
+/**
+ * Events generation logic.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;