You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:51:45 UTC
[1/6] beam git commit: [Nexmark] Extract BidGenerator from Generator
Repository: beam
Updated Branches:
refs/heads/master f10399d7c -> a52dbeaca
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
deleted file mode 100644
index 90918d6..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
-
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Auction;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
-
-/**
- * AuctionGenerator.
- */
-public class AuctionGenerator {
- /**
- * Keep the number of categories small so the example queries will find results even with
- * a small batch of events.
- */
- private static final int NUM_CATEGORIES = 5;
-
- /**
- * Number of yet-to-be-created people and auction ids allowed.
- */
- private static final int AUCTION_ID_LEAD = 10;
-
- /**
- * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
- * over these values.
- */
- private static final int HOT_SELLER_RATIO = 100;
-
- /**
- * Generate and return a random auction with next available id.
- */
- public static Auction nextAuction(
- long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) {
-
- long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
-
- long seller;
- // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
- if (random.nextInt(config.getHotSellersRatio()) > 0) {
- // Choose the first person in the batch of last HOT_SELLER_RATIO people.
- seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
- } else {
- seller = nextBase0PersonId(eventId, random, config);
- }
- seller += GeneratorConfig.FIRST_PERSON_ID;
-
- long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
- long initialBid = nextPrice(random);
- long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config);
- String name = nextString(random, 20);
- String desc = nextString(random, 100);
- long reserve = initialBid + nextPrice(random);
- int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
- String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
- return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
- extra);
- }
-
- /**
- * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
- * due to generate an auction.
- */
- public static long lastBase0AuctionId(long eventId) {
- long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
- long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
- if (offset < GeneratorConfig.PERSON_PROPORTION) {
- // About to generate a person.
- // Go back to the last auction in the last epoch.
- epoch--;
- offset = GeneratorConfig.AUCTION_PROPORTION - 1;
- } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- // About to generate a bid.
- // Go back to the last auction generated in this epoch.
- offset = GeneratorConfig.AUCTION_PROPORTION - 1;
- } else {
- // About to generate an auction.
- offset -= GeneratorConfig.PERSON_PROPORTION;
- }
- return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
- }
-
- /**
- * Return a random auction id (base 0).
- */
- public static long nextBase0AuctionId(
- long nextEventId, Random random, GeneratorConfig config) {
-
- // Choose a random auction for any of those which are likely to still be in flight,
- // plus a few 'leads'.
- // Note that ideally we'd track non-expired auctions exactly, but that state
- // is difficult to split.
- long minAuction = Math.max(
- lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
- long maxAuction = lastBase0AuctionId(nextEventId);
- return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
- }
-
- /** Return a random time delay, in milliseconds, for length of auctions. */
- private static long nextAuctionLengthMs(
- long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) {
-
- // What's our current event number?
- long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
- // How many events till we've generated numInFlightAuctions?
- long numEventsForAuctions =
- (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
- / GeneratorConfig.AUCTION_PROPORTION;
- // When will the auction numInFlightAuctions beyond now be generated?
- long futureAuction =
- config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
- .getKey();
- // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
- // futureAuction - timestamp, numEventsForAuctions);
- // Choose a length with average horizonMs.
- long horizonMs = futureAuction - timestamp;
- return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
deleted file mode 100644
index 8eccb66..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import java.util.Random;
-
-/**
- * LongGenerator.
- */
-public class LongGenerator {
-
- /** Return a random long from {@code [0, n)}. */
- public static long nextLong(Random random, long n) {
- if (n < Integer.MAX_VALUE) {
- return random.nextInt((int) n);
- } else {
- // WARNING: Very skewed distribution! Bad!
- return Math.abs(random.nextLong() % n);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
deleted file mode 100644
index a02fff9..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Person;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
-
-/**
- * Generates people.
- */
-public class PersonGenerator {
- /**
- * Number of yet-to-be-created people and auction ids allowed.
- */
- private static final int PERSON_ID_LEAD = 10;
-
- /**
- * Keep the number of states small so that the example queries will find results even with
- * a small batch of events.
- */
- private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
-
- private static final List<String> US_CITIES =
- Arrays.asList(
- ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
- .split(","));
-
- private static final List<String> FIRST_NAMES =
- Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
-
- private static final List<String> LAST_NAMES =
- Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
-
-
- /**
- * Generate and return a random person with next available id.
- */
- public static Person nextPerson(
- long nextEventId, Random random, long timestamp, GeneratorConfig config) {
-
- long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
- String name = nextPersonName(random);
- String email = nextEmail(random);
- String creditCard = nextCreditCard(random);
- String city = nextUSCity(random);
- String state = nextUSState(random);
- int currentSize =
- 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
- String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize());
- return new Person(id, name, email, creditCard, city, state, timestamp, extra);
- }
-
- /**
- * Return a random person id (base 0).
- */
- public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) {
- // Choose a random person from any of the 'active' people, plus a few 'leads'.
- // By limiting to 'active' we ensure the density of bids or auctions per person
- // does not decrease over time for long running jobs.
- // By choosing a person id ahead of the last valid person id we will make
- // newPerson and newAuction events appear to have been swapped in time.
- long numPeople = lastBase0PersonId(eventId) + 1;
- long activePeople = Math.min(numPeople, config.getNumActivePeople());
- long n = nextLong(random, activePeople + PERSON_ID_LEAD);
- return numPeople - activePeople + n;
- }
-
- /**
- * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
- * due to generate a person.
- */
- public static long lastBase0PersonId(long eventId) {
- long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
- long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
- if (offset >= GeneratorConfig.PERSON_PROPORTION) {
- // About to generate an auction or bid.
- // Go back to the last person generated in this epoch.
- offset = GeneratorConfig.PERSON_PROPORTION - 1;
- }
- // About to generate a person.
- return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
- }
-
-
- /** return a random US state. */
- private static String nextUSState(Random random) {
- return US_STATES.get(random.nextInt(US_STATES.size()));
- }
-
- /** Return a random US city. */
- private static String nextUSCity(Random random) {
- return US_CITIES.get(random.nextInt(US_CITIES.size()));
- }
-
- /** Return a random person name. */
- private static String nextPersonName(Random random) {
- return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
- + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
- }
-
- /** Return a random email address. */
- private static String nextEmail(Random random) {
- return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
- }
-
- /** Return a random credit card number. */
- private static String nextCreditCard(Random random) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 4; i++) {
- if (i > 0) {
- sb.append(' ');
- }
- sb.append(String.format("%04d", random.nextInt(10000)));
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
deleted file mode 100644
index 9dae1ca..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import java.util.Random;
-
-/**
- * Generates a random price.
- */
-public class PriceGenerator {
-
- /** Return a random price. */
- public static long nextPrice(Random random) {
- return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
deleted file mode 100644
index 4e69a9d..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import java.util.Random;
-
-/**
- * Generates strings which are used for different field in other model objects.
- */
-public class StringsGenerator {
-
- /** Smallest random string size. */
- private static final int MIN_STRING_LENGTH = 3;
-
- /** Return a random string of up to {@code maxLength}. */
- public static String nextString(Random random, int maxLength) {
- int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
- StringBuilder sb = new StringBuilder();
- while (len-- > 0) {
- if (random.nextInt(13) == 0) {
- sb.append(' ');
- } else {
- sb.append((char) ('a' + random.nextInt(26)));
- }
- }
- return sb.toString().trim();
- }
-
- /** Return a random string of exactly {@code length}. */
- public static String nextExactString(Random random, int length) {
- StringBuilder sb = new StringBuilder();
- while (length-- > 0) {
- sb.append((char) ('a' + random.nextInt(26)));
- }
- return sb.toString();
- }
-
- /**
- * Return a random {@code string} such that {@code currentSize + string.length()} is on average
- * {@code averageSize}.
- */
- public static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
- if (currentSize > desiredAverageSize) {
- return "";
- }
- desiredAverageSize -= currentSize;
- int delta = (int) Math.round(desiredAverageSize * 0.2);
- int minSize = desiredAverageSize - delta;
- int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
- return nextExactString(random, desiredSize);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
deleted file mode 100644
index e09564a..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Utility classes for Generator.
- */
-package org.apache.beam.sdk.nexmark.sources.utils;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
index 3590d64..beef314 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.nexmark.sources;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkOptions;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
index 9553d22..fbb8136 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
index c00d1a3..5c9bf5f 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
@@ -30,6 +30,9 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.junit.Test;
[5/6] beam git commit: [Nexmark] Extract PersonGenerator,
StringsGenerator, LongGenerator from Generator
Posted by ke...@apache.org.
[Nexmark] Extract PersonGenerator, StringsGenerator, LongGenerator from Generator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7055e0f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7055e0f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7055e0f3
Branch: refs/heads/master
Commit: 7055e0f349198575caddaf4106cff24b05fae8f4
Parents: 4fce640
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 15:01:58 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/nexmark/sources/Generator.java | 163 +------------------
.../nexmark/sources/utils/LongGenerator.java | 37 +++++
.../nexmark/sources/utils/PersonGenerator.java | 140 ++++++++++++++++
.../nexmark/sources/utils/StringsGenerator.java | 68 ++++++++
.../sdk/nexmark/sources/utils/package-info.java | 22 +++
5 files changed, 276 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
index 630d0b5..69d4579 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
@@ -18,18 +18,21 @@
package org.apache.beam.sdk.nexmark.sources;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson;
+import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Iterator;
-import java.util.List;
import java.util.Objects;
import java.util.Random;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
@@ -53,30 +56,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
*/
private static final int NUM_CATEGORIES = 5;
- /** Smallest random string size. */
- private static final int MIN_STRING_LENGTH = 3;
-
- /**
- * Keep the number of states small so that the example queries will find results even with
- * a small batch of events.
- */
- private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
-
- private static final List<String> US_CITIES =
- Arrays.asList(
- ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
- .split(","));
-
- private static final List<String> FIRST_NAMES =
- Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
-
- private static final List<String> LAST_NAMES =
- Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
-
/**
* Number of yet-to-be-created people and auction ids allowed.
*/
- private static final int PERSON_ID_LEAD = 10;
private static final int AUCTION_ID_LEAD = 10;
/**
@@ -223,21 +205,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
}
- /**
- * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
- * due to generate a person.
- */
- private long lastBase0PersonId(long eventId) {
- long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
- long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
- if (offset >= GeneratorConfig.PERSON_PROPORTION) {
- // About to generate an auction or bid.
- // Go back to the last person generated in this epoch.
- offset = GeneratorConfig.PERSON_PROPORTION - 1;
- }
- // About to generate a person.
- return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
- }
/**
* Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
@@ -261,63 +228,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
}
return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
}
-
- /** return a random US state. */
- private static String nextUSState(Random random) {
- return US_STATES.get(random.nextInt(US_STATES.size()));
- }
-
- /** Return a random US city. */
- private static String nextUSCity(Random random) {
- return US_CITIES.get(random.nextInt(US_CITIES.size()));
- }
-
- /** Return a random person name. */
- private static String nextPersonName(Random random) {
- return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
- + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
- }
-
- /** Return a random string of up to {@code maxLength}. */
- private static String nextString(Random random, int maxLength) {
- int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
- StringBuilder sb = new StringBuilder();
- while (len-- > 0) {
- if (random.nextInt(13) == 0) {
- sb.append(' ');
- } else {
- sb.append((char) ('a' + random.nextInt(26)));
- }
- }
- return sb.toString().trim();
- }
-
- /** Return a random string of exactly {@code length}. */
- private static String nextExactString(Random random, int length) {
- StringBuilder sb = new StringBuilder();
- while (length-- > 0) {
- sb.append((char) ('a' + random.nextInt(26)));
- }
- return sb.toString();
- }
-
- /** Return a random email address. */
- private static String nextEmail(Random random) {
- return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
- }
-
- /** Return a random credit card number. */
- private static String nextCreditCard(Random random) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 4; i++) {
- if (i > 0) {
- sb.append(' ');
- }
- sb.append(String.format("%04d", random.nextInt(10000)));
- }
- return sb.toString();
- }
-
/** Return a random price. */
private static long nextPrice(Random random) {
return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
@@ -342,61 +252,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
}
- /**
- * Return a random {@code string} such that {@code currentSize + string.length()} is on average
- * {@code averageSize}.
- */
- private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
- if (currentSize > desiredAverageSize) {
- return "";
- }
- desiredAverageSize -= currentSize;
- int delta = (int) Math.round(desiredAverageSize * 0.2);
- int minSize = desiredAverageSize - delta;
- int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
- return nextExactString(random, desiredSize);
- }
-
- /** Return a random long from {@code [0, n)}. */
- private static long nextLong(Random random, long n) {
- if (n < Integer.MAX_VALUE) {
- return random.nextInt((int) n);
- } else {
- // WARNING: Very skewed distribution! Bad!
- return Math.abs(random.nextLong() % n);
- }
- }
-
- /**
- * Generate and return a random person with next available id.
- */
- private Person nextPerson(long nextEventId, Random random, long timestamp) {
- long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
- String name = nextPersonName(random);
- String email = nextEmail(random);
- String creditCard = nextCreditCard(random);
- String city = nextUSCity(random);
- String state = nextUSState(random);
- int currentSize =
- 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
- String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
- return new Person(id, name, email, creditCard, city, state, timestamp, extra);
- }
-
- /**
- * Return a random person id (base 0).
- */
- private long nextBase0PersonId(long eventId, Random random) {
- // Choose a random person from any of the 'active' people, plus a few 'leads'.
- // By limiting to 'active' we ensure the density of bids or auctions per person
- // does not decrease over time for long running jobs.
- // By choosing a person id ahead of the last valid person id we will make
- // newPerson and newAuction events appear to have been swapped in time.
- long numPeople = lastBase0PersonId(eventId) + 1;
- long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
- long n = nextLong(random, activePeople + PERSON_ID_LEAD);
- return numPeople - activePeople + n;
- }
/**
* Return a random auction id (base 0).
@@ -424,7 +279,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
// Choose the first person in the batch of last HOT_SELLER_RATIO people.
seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
} else {
- seller = nextBase0PersonId(eventId, random);
+ seller = nextBase0PersonId(eventId, random, config);
}
seller += GeneratorConfig.FIRST_PERSON_ID;
@@ -461,7 +316,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
// last HOT_BIDDER_RATIO people.
bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
} else {
- bidder = nextBase0PersonId(eventId, random);
+ bidder = nextBase0PersonId(eventId, random, config);
}
bidder += GeneratorConfig.FIRST_PERSON_ID;
@@ -513,7 +368,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
Event event;
if (rem < GeneratorConfig.PERSON_PROPORTION) {
- event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp));
+ event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
} else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp));
} else {
http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
new file mode 100644
index 0000000..8eccb66
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import java.util.Random;
+
+/**
+ * LongGenerator.
+ */
+public class LongGenerator {
+
+ /** Return a random long from {@code [0, n)}. */
+ public static long nextLong(Random random, long n) {
+ if (n < Integer.MAX_VALUE) {
+ return random.nextInt((int) n);
+ } else {
+ // WARNING: Very skewed distribution! Bad!
+ return Math.abs(random.nextLong() % n);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
new file mode 100644
index 0000000..a02fff9
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+
+/**
+ * Generates people.
+ */
+public class PersonGenerator {
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int PERSON_ID_LEAD = 10;
+
+ /**
+ * Keep the number of states small so that the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+ private static final List<String> US_CITIES =
+ Arrays.asList(
+ ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+ .split(","));
+
+ private static final List<String> FIRST_NAMES =
+ Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+ private static final List<String> LAST_NAMES =
+ Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+
+ /**
+ * Generate and return a random person with next available id.
+ */
+ public static Person nextPerson(
+ long nextEventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
+ String name = nextPersonName(random);
+ String email = nextEmail(random);
+ String creditCard = nextCreditCard(random);
+ String city = nextUSCity(random);
+ String state = nextUSState(random);
+ int currentSize =
+ 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+ String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize());
+ return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+ }
+
+ /**
+ * Return a random person id (base 0).
+ */
+ public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) {
+ // Choose a random person from any of the 'active' people, plus a few 'leads'.
+ // By limiting to 'active' we ensure the density of bids or auctions per person
+ // does not decrease over time for long running jobs.
+ // By choosing a person id ahead of the last valid person id we will make
+ // newPerson and newAuction events appear to have been swapped in time.
+ long numPeople = lastBase0PersonId(eventId) + 1;
+ long activePeople = Math.min(numPeople, config.getNumActivePeople());
+ long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+ return numPeople - activePeople + n;
+ }
+
+ /**
+ * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+ * due to generate a person.
+ */
+ public static long lastBase0PersonId(long eventId) {
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate an auction or bid.
+ // Go back to the last person generated in this epoch.
+ offset = GeneratorConfig.PERSON_PROPORTION - 1;
+ }
+ // About to generate a person.
+ return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+ }
+
+
+ /** return a random US state. */
+ private static String nextUSState(Random random) {
+ return US_STATES.get(random.nextInt(US_STATES.size()));
+ }
+
+ /** Return a random US city. */
+ private static String nextUSCity(Random random) {
+ return US_CITIES.get(random.nextInt(US_CITIES.size()));
+ }
+
+ /** Return a random person name. */
+ private static String nextPersonName(Random random) {
+ return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+ + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+ }
+
+ /** Return a random email address. */
+ private static String nextEmail(Random random) {
+ return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+ }
+
+ /** Return a random credit card number. */
+ private static String nextCreditCard(Random random) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ if (i > 0) {
+ sb.append(' ');
+ }
+ sb.append(String.format("%04d", random.nextInt(10000)));
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
new file mode 100644
index 0000000..4e69a9d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import java.util.Random;
+
+/**
+ * Generates strings which are used for different field in other model objects.
+ */
+public class StringsGenerator {
+
+ /** Smallest random string size. */
+ private static final int MIN_STRING_LENGTH = 3;
+
+ /** Return a random string of up to {@code maxLength}. */
+ public static String nextString(Random random, int maxLength) {
+ int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+ StringBuilder sb = new StringBuilder();
+ while (len-- > 0) {
+ if (random.nextInt(13) == 0) {
+ sb.append(' ');
+ } else {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ /** Return a random string of exactly {@code length}. */
+ public static String nextExactString(Random random, int length) {
+ StringBuilder sb = new StringBuilder();
+ while (length-- > 0) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+ * {@code averageSize}.
+ */
+ public static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+ if (currentSize > desiredAverageSize) {
+ return "";
+ }
+ desiredAverageSize -= currentSize;
+ int delta = (int) Math.round(desiredAverageSize * 0.2);
+ int minSize = desiredAverageSize - delta;
+ int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+ return nextExactString(random, desiredSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
new file mode 100644
index 0000000..e09564a
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes for Generator.
+ */
+package org.apache.beam.sdk.nexmark.sources.utils;
[6/6] beam git commit: This closes #4807: [BEAM-3147] Nexmark
Generator refactor
Posted by ke...@apache.org.
This closes #4807: [BEAM-3147] Nexmark Generator refactor
[Nexmark] Extract BidGenerator from Generator
[Nexmark] Extract AuctionGenerator, PriceGenerator from Generator
[Nexmark] Extract PersonGenerator, StringsGenerator, LongGenerator from Generator
[Nexmark] Extract GeneratorCheckpoint into a separate class. Move getNextEvent() call to the top of the stack.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a52dbeac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a52dbeac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a52dbeac
Branch: refs/heads/master
Commit: a52dbeacaf544014c4c406f9477ede9f3fb45654
Parents: f10399d d8a6fad
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Nov 17 12:36:46 2017 -0800
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Fri Nov 17 12:36:46 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/nexmark/NexmarkUtils.java | 4 +-
.../beam/sdk/nexmark/queries/WinningBids.java | 2 +-
.../sdk/nexmark/sources/BoundedEventSource.java | 2 +
.../beam/sdk/nexmark/sources/Generator.java | 609 -------------------
.../sdk/nexmark/sources/GeneratorConfig.java | 298 ---------
.../nexmark/sources/UnboundedEventSource.java | 23 +-
.../nexmark/sources/generator/Generator.java | 271 +++++++++
.../sources/generator/GeneratorCheckpoint.java | 82 +++
.../sources/generator/GeneratorConfig.java | 339 +++++++++++
.../generator/model/AuctionGenerator.java | 142 +++++
.../sources/generator/model/BidGenerator.java | 76 +++
.../sources/generator/model/LongGenerator.java | 37 ++
.../generator/model/PersonGenerator.java | 139 +++++
.../sources/generator/model/PriceGenerator.java | 32 +
.../generator/model/StringsGenerator.java | 68 +++
.../sources/generator/model/package-info.java | 22 +
.../nexmark/sources/generator/package-info.java | 26 +
.../nexmark/sources/BoundedEventSourceTest.java | 1 +
.../beam/sdk/nexmark/sources/GeneratorTest.java | 2 +
.../sources/UnboundedEventSourceTest.java | 6 +-
20 files changed, 1261 insertions(+), 920 deletions(-)
----------------------------------------------------------------------
[4/6] beam git commit: [Nexmark] Extract GeneratorCheckpoint into a
separate class. Move getNextEvent() call to the top of the stack.
Posted by ke...@apache.org.
[Nexmark] Extract GeneratorCheckpoint into a separate class. Move getNextEvent() call to the top of the stack.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fce6401
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fce6401
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fce6401
Branch: refs/heads/master
Commit: 4fce6401fa1abb91c0e493791c29fa0f2cefe0d3
Parents: 1b3f1c1
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 14:47:38 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/nexmark/sources/Generator.java | 145 ++++++-------------
.../nexmark/sources/GeneratorCheckpoint.java | 78 ++++++++++
.../sdk/nexmark/sources/GeneratorConfig.java | 41 ++++++
.../nexmark/sources/UnboundedEventSource.java | 20 +--
.../sources/UnboundedEventSourceTest.java | 3 +-
5 files changed, 179 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
index c368d72..630d0b5 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
@@ -19,20 +19,13 @@ package org.apache.beam.sdk.nexmark.sources;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
+
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
@@ -95,55 +88,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
private static final int HOT_BIDDER_RATIO = 100;
/**
- * Just enough state to be able to restore a generator back to where it was checkpointed.
- */
- public static class Checkpoint implements UnboundedSource.CheckpointMark {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- /** Coder for this class. */
- public static final Coder<Checkpoint> CODER_INSTANCE =
- new CustomCoder<Checkpoint>() {
- @Override public void encode(Checkpoint value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.numEvents, outStream);
- LONG_CODER.encode(value.wallclockBaseTime, outStream);
- }
-
- @Override
- public Checkpoint decode(InputStream inStream)
- throws CoderException, IOException {
- long numEvents = LONG_CODER.decode(inStream);
- long wallclockBaseTime = LONG_CODER.decode(inStream);
- return new Checkpoint(numEvents, wallclockBaseTime);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- private final long numEvents;
- private final long wallclockBaseTime;
-
- private Checkpoint(long numEvents, long wallclockBaseTime) {
- this.numEvents = numEvents;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- public Generator toGenerator(GeneratorConfig config) {
- return new Generator(config, numEvents, wallclockBaseTime);
- }
-
- @Override
- public void finalizeCheckpoint() throws IOException {
- // Nothing to finalize.
- }
-
- @Override
- public String toString() {
- return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
- numEvents, wallclockBaseTime);
- }
- }
-
- /**
* The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
* (arbitrary but stable) event hash order.
*/
@@ -213,17 +157,17 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
private GeneratorConfig config;
/** Number of events generated by this generator. */
- private long numEvents;
+ private long eventsCountSoFar;
/**
* Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
*/
private long wallclockBaseTime;
- private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
+ Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
checkNotNull(config);
this.config = config;
- this.numEvents = numEvents;
+ this.eventsCountSoFar = eventsCountSoFar;
this.wallclockBaseTime = wallclockBaseTime;
}
@@ -237,8 +181,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/**
* Return a checkpoint for the current generator.
*/
- public Checkpoint toCheckpoint() {
- return new Checkpoint(numEvents, wallclockBaseTime);
+ public GeneratorCheckpoint toCheckpoint() {
+ return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
}
/**
@@ -246,7 +190,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
*/
public Generator copy() {
checkNotNull(config);
- Generator result = new Generator(config, numEvents, wallclockBaseTime);
+ Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
return result;
}
@@ -276,15 +220,14 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
* help with bookkeeping.
*/
public long getNextEventId() {
- return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
+ return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
}
/**
* Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
* due to generate a person.
*/
- private long lastBase0PersonId() {
- long eventId = getNextEventId();
+ private long lastBase0PersonId(long eventId) {
long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
if (offset >= GeneratorConfig.PERSON_PROPORTION) {
@@ -300,8 +243,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
* Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
* due to generate an auction.
*/
- private long lastBase0AuctionId() {
- long eventId = getNextEventId();
+ private long lastBase0AuctionId(long eventId) {
long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
if (offset < GeneratorConfig.PERSON_PROPORTION) {
@@ -384,7 +326,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/** Return a random time delay, in milliseconds, for length of auctions. */
private long nextAuctionLengthMs(Random random, long timestamp) {
// What's our current event number?
- long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
+ long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
// How many events till we've generated numInFlightAuctions?
long numEventsForAuctions =
(config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
@@ -428,8 +370,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/**
* Generate and return a random person with next available id.
*/
- private Person nextPerson(Random random, long timestamp) {
- long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
+ private Person nextPerson(long nextEventId, Random random, long timestamp) {
+ long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
String name = nextPersonName(random);
String email = nextEmail(random);
String creditCard = nextCreditCard(random);
@@ -444,13 +386,13 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/**
* Return a random person id (base 0).
*/
- private long nextBase0PersonId(Random random) {
+ private long nextBase0PersonId(long eventId, Random random) {
// Choose a random person from any of the 'active' people, plus a few 'leads'.
// By limiting to 'active' we ensure the density of bids or auctions per person
// does not decrease over time for long running jobs.
// By choosing a person id ahead of the last valid person id we will make
// newPerson and newAuction events appear to have been swapped in time.
- long numPeople = lastBase0PersonId() + 1;
+ long numPeople = lastBase0PersonId(eventId) + 1;
long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
long n = nextLong(random, activePeople + PERSON_ID_LEAD);
return numPeople - activePeople + n;
@@ -459,29 +401,30 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/**
* Return a random auction id (base 0).
*/
- private long nextBase0AuctionId(Random random) {
+ private long nextBase0AuctionId(long nextEventId, Random random) {
// Choose a random auction for any of those which are likely to still be in flight,
// plus a few 'leads'.
// Note that ideally we'd track non-expired auctions exactly, but that state
// is difficult to split.
- long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
- long maxAuction = lastBase0AuctionId();
+ long minAuction = Math.max(
+ lastBase0AuctionId(nextEventId) - config.configuration.numInFlightAuctions, 0);
+ long maxAuction = lastBase0AuctionId(nextEventId);
return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
}
/**
* Generate and return a random auction with next available id.
*/
- private Auction nextAuction(Random random, long timestamp) {
- long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
+ private Auction nextAuction(long eventId, Random random, long timestamp) {
+ long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
long seller;
// Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
// Choose the first person in the batch of last HOT_SELLER_RATIO people.
- seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+ seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
} else {
- seller = nextBase0PersonId(random);
+ seller = nextBase0PersonId(eventId, random);
}
seller += GeneratorConfig.FIRST_PERSON_ID;
@@ -500,14 +443,14 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/**
* Generate and return a random bid with next available id.
*/
- private Bid nextBid(Random random, long timestamp) {
+ private Bid nextBid(long eventId, Random random, long timestamp) {
long auction;
// Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
// Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
- auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+ auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
} else {
- auction = nextBase0AuctionId(random);
+ auction = nextBase0AuctionId(eventId, random);
}
auction += GeneratorConfig.FIRST_AUCTION_ID;
@@ -516,9 +459,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
// Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
// last HOT_BIDDER_RATIO people.
- bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+ bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
} else {
- bidder = nextBase0PersonId(random);
+ bidder = nextBase0PersonId(eventId, random);
}
bidder += GeneratorConfig.FIRST_PERSON_ID;
@@ -530,7 +473,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
@Override
public boolean hasNext() {
- return numEvents < config.maxEvents;
+ return eventsCountSoFar < config.maxEvents;
}
/**
@@ -544,34 +487,40 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
}
// When, in event time, we should generate the event. Monotonic.
long eventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextEventNumber(eventsCountSoFar)).getKey();
// When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
// may have local jitter.
long adjustedEventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextAdjustedEventNumber(eventsCountSoFar))
.getKey();
// The minimum of this and all future adjusted event timestamps. Accounts for jitter in
// the event timestamp.
long watermark =
- config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextEventNumberForWatermark(eventsCountSoFar))
.getKey();
// When, in wallclock time, we should emit the event.
long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
// Seed the random number generator with the next 'event id'.
Random random = new Random(getNextEventId());
- long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+
+ long newEventId = getNextEventId();
+ long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
Event event;
if (rem < GeneratorConfig.PERSON_PROPORTION) {
- event = new Event(nextPerson(random, adjustedEventTimestamp));
+ event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp));
} else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- event = new Event(nextAuction(random, adjustedEventTimestamp));
+ event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp));
} else {
- event = new Event(nextBid(random, adjustedEventTimestamp));
+ event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
}
- numEvents++;
+ eventsCountSoFar++;
return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
}
@@ -590,7 +539,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
* Return how many microseconds till we emit the next event.
*/
public long currentInterEventDelayUs() {
- return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
+ return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
.getValue();
}
@@ -598,12 +547,12 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
* Return an estimate of fraction of output consumed.
*/
public double getFractionConsumed() {
- return (double) numEvents / config.maxEvents;
+ return (double) eventsCountSoFar / config.maxEvents;
}
@Override
public String toString() {
- return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
- numEvents, wallclockBaseTime);
+ return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
+ eventsCountSoFar, wallclockBaseTime);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
new file mode 100644
index 0000000..dfc135d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ /** Coder for this class. */
+ public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
+ new CustomCoder<GeneratorCheckpoint>() {
+ @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.numEvents, outStream);
+ LONG_CODER.encode(value.wallclockBaseTime, outStream);
+ }
+
+ @Override
+ public GeneratorCheckpoint decode(InputStream inStream)
+ throws CoderException, IOException {
+ long numEvents = LONG_CODER.decode(inStream);
+ long wallclockBaseTime = LONG_CODER.decode(inStream);
+ return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
+ }
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
+ };
+
+ private final long numEvents;
+ private final long wallclockBaseTime;
+
+ GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ public Generator toGenerator(GeneratorConfig config) {
+ return new Generator(config, numEvents, wallclockBaseTime);
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ // Nothing to finalize.
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator.GeneratorCheckpoint{numEvents:%d;wallclockBaseTime:%d}",
+ numEvents, wallclockBaseTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
index 42183c6..8e0a899 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.nexmark.sources;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.values.KV;
@@ -186,6 +187,46 @@ public class GeneratorConfig implements Serializable {
+ numBids * configuration.avgBidByteSize;
}
+ public int getAvgPersonByteSize() {
+ return configuration.avgPersonByteSize;
+ }
+
+ public int getNumActivePeople() {
+ return configuration.numActivePeople;
+ }
+
+ public int getHotSellersRatio() {
+ return configuration.hotSellersRatio;
+ }
+
+ public int getNumInFlightAuctions() {
+ return configuration.numInFlightAuctions;
+ }
+
+ public int getHotAuctionRatio() {
+ return configuration.hotAuctionRatio;
+ }
+
+ public int getHotBiddersRatio() {
+ return configuration.hotBiddersRatio;
+ }
+
+ public int getAvgBidByteSize() {
+ return configuration.avgBidByteSize;
+ }
+
+ public int getAvgAuctionByteSize() {
+ return configuration.avgAuctionByteSize;
+ }
+
+ public double getProbDelayedEvent() {
+ return configuration.probDelayedEvent;
+ }
+
+ public long getOccasionalDelaySec() {
+ return configuration.occasionalDelaySec;
+ }
+
/**
* Return an estimate of the byte-size of all events a generator for this config would yield.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index 8f5575c..74eb061 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -23,7 +23,9 @@ import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
+
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
@@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory;
* that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise,
* events are returned every time the system asks for one.
*/
-public class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> {
+public class UnboundedEventSource extends UnboundedSource<Event, GeneratorCheckpoint> {
private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
@@ -161,12 +163,12 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
watermark - next.eventTimestamp);
} else if (generator.hasNext()) {
next = generator.nextEvent();
- if (isRateLimited && config.configuration.probDelayedEvent > 0.0
- && config.configuration.occasionalDelaySec > 0
- && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) {
+ if (isRateLimited && config.getProbDelayedEvent() > 0.0
+ && config.getOccasionalDelaySec() > 0
+ && ThreadLocalRandom.current().nextDouble() < config.getProbDelayedEvent()) {
// We'll hold back this event and go around again.
long delayMs =
- ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000)
+ ThreadLocalRandom.current().nextLong(config.getOccasionalDelaySec() * 1000)
+ 1L;
LOG.debug("delaying event by {}ms", delayMs);
heldBackEvents.add(next.withDelay(delayMs));
@@ -265,7 +267,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
}
@Override
- public Generator.Checkpoint getCheckpointMark() {
+ public GeneratorCheckpoint getCheckpointMark() {
return generator.toCheckpoint();
}
@@ -283,8 +285,8 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
}
@Override
- public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
- return Generator.Checkpoint.CODER_INSTANCE;
+ public Coder<GeneratorCheckpoint> getCheckpointMarkCoder() {
+ return GeneratorCheckpoint.CODER_INSTANCE;
}
@Override
@@ -301,7 +303,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
@Override
public EventReader createReader(
- PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
+ PipelineOptions options, @Nullable GeneratorCheckpoint checkpoint) {
if (checkpoint == null) {
LOG.trace("creating initial unbounded reader for {}", config);
return new EventReader(config);
http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
index 3853ede..c00d1a3 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
+
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
@@ -97,7 +98,7 @@ public class UnboundedEventSourceTest {
n -= m;
System.out.printf("splitting with %d remaining...%n", n);
CheckpointMark checkpointMark = reader.getCheckpointMark();
- reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
+ reader = source.createReader(options, (GeneratorCheckpoint) checkpointMark);
}
assertFalse(reader.advance());
[2/6] beam git commit: [Nexmark] Extract BidGenerator from Generator
Posted by ke...@apache.org.
[Nexmark] Extract BidGenerator from Generator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8a6fad9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8a6fad9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8a6fad9
Branch: refs/heads/master
Commit: d8a6fad9ed4b65504911fa9d5dadf5c8d4a7a0e6
Parents: e895fc8
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 15:19:39 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/nexmark/NexmarkUtils.java | 4 +-
.../beam/sdk/nexmark/queries/WinningBids.java | 2 +-
.../sdk/nexmark/sources/BoundedEventSource.java | 2 +
.../beam/sdk/nexmark/sources/Generator.java | 316 -----------------
.../nexmark/sources/GeneratorCheckpoint.java | 78 -----
.../sdk/nexmark/sources/GeneratorConfig.java | 339 -------------------
.../nexmark/sources/UnboundedEventSource.java | 3 +
.../nexmark/sources/generator/Generator.java | 271 +++++++++++++++
.../sources/generator/GeneratorCheckpoint.java | 82 +++++
.../sources/generator/GeneratorConfig.java | 339 +++++++++++++++++++
.../generator/model/AuctionGenerator.java | 142 ++++++++
.../sources/generator/model/BidGenerator.java | 76 +++++
.../sources/generator/model/LongGenerator.java | 37 ++
.../generator/model/PersonGenerator.java | 139 ++++++++
.../sources/generator/model/PriceGenerator.java | 32 ++
.../generator/model/StringsGenerator.java | 68 ++++
.../sources/generator/model/package-info.java | 22 ++
.../nexmark/sources/generator/package-info.java | 26 ++
.../nexmark/sources/utils/AuctionGenerator.java | 145 --------
.../nexmark/sources/utils/LongGenerator.java | 37 --
.../nexmark/sources/utils/PersonGenerator.java | 140 --------
.../nexmark/sources/utils/PriceGenerator.java | 32 --
.../nexmark/sources/utils/StringsGenerator.java | 68 ----
.../sdk/nexmark/sources/utils/package-info.java | 22 --
.../nexmark/sources/BoundedEventSourceTest.java | 1 +
.../beam/sdk/nexmark/sources/GeneratorTest.java | 2 +
.../sources/UnboundedEventSourceTest.java | 3 +
27 files changed, 1248 insertions(+), 1180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index fa1ef16..fc0ab9f 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -50,9 +50,9 @@ import org.apache.beam.sdk.nexmark.model.NameCityStateId;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.model.SellerPrice;
import org.apache.beam.sdk.nexmark.sources.BoundedEventSource;
-import org.apache.beam.sdk.nexmark.sources.Generator;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index bc553c9..3ee4f3a 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -44,7 +44,7 @@ import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
index 60124bb..cc32007 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
deleted file mode 100644
index 68e6748..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.lastBase0AuctionId;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextAuction;
-import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextBase0AuctionId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
-import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson;
-import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Bid;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
- * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
- * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
- *
- * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
- * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
- * (in microseconds). The event stream is thus fully deterministic and does not depend on
- * wallclock time.
- *
- * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
- * so that we can resume generating events from a saved snapshot.
- */
-public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
-
- /**
- * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
- * over these values.
- */
- private static final int HOT_AUCTION_RATIO = 100;
- private static final int HOT_BIDDER_RATIO = 100;
-
- /**
- * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
- * (arbitrary but stable) event hash order.
- */
- public static class NextEvent implements Comparable<NextEvent> {
- /** When, in wallclock time, should this event be emitted? */
- public final long wallclockTimestamp;
-
- /** When, in event time, should this event be considered to have occured? */
- public final long eventTimestamp;
-
- /** The event itself. */
- public final Event event;
-
- /** The minimum of this and all future event timestamps. */
- public final long watermark;
-
- public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
- this.wallclockTimestamp = wallclockTimestamp;
- this.eventTimestamp = eventTimestamp;
- this.event = event;
- this.watermark = watermark;
- }
-
- /**
- * Return a deep copy of next event with delay added to wallclock timestamp and
- * event annotate as 'LATE'.
- */
- public NextEvent withDelay(long delayMs) {
- return new NextEvent(
- wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
- }
-
- @Override public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- NextEvent nextEvent = (NextEvent) o;
-
- return (wallclockTimestamp == nextEvent.wallclockTimestamp
- && eventTimestamp == nextEvent.eventTimestamp
- && watermark == nextEvent.watermark
- && event.equals(nextEvent.event));
- }
-
- @Override public int hashCode() {
- return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
- }
-
- @Override
- public int compareTo(NextEvent other) {
- int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
- if (i != 0) {
- return i;
- }
- return Integer.compare(event.hashCode(), other.event.hashCode());
- }
- }
-
- /**
- * Configuration to generate events against. Note that it may be replaced by a call to
- * {@link #splitAtEventId}.
- */
- private GeneratorConfig config;
-
- /** Number of events generated by this generator. */
- private long eventsCountSoFar;
-
- /**
- * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
- */
- private long wallclockBaseTime;
-
- Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
- checkNotNull(config);
- this.config = config;
- this.eventsCountSoFar = eventsCountSoFar;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- /**
- * Create a fresh generator according to {@code config}.
- */
- public Generator(GeneratorConfig config) {
- this(config, 0, -1);
- }
-
- /**
- * Return a checkpoint for the current generator.
- */
- public GeneratorCheckpoint toCheckpoint() {
- return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
- }
-
- /**
- * Return a deep copy of this generator.
- */
- public Generator copy() {
- checkNotNull(config);
- Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
- return result;
- }
-
- /**
- * Return the current config for this generator. Note that configs may be replaced by {@link
- * #splitAtEventId}.
- */
- public GeneratorConfig getCurrentConfig() {
- return config;
- }
-
- /**
- * Mutate this generator so that it will only generate events up to but not including
- * {@code eventId}. Return a config to represent the events this generator will no longer yield.
- * The generators will run in on a serial timeline.
- */
- public GeneratorConfig splitAtEventId(long eventId) {
- long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
- GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
- config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
- config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
- return remainConfig;
- }
-
- /**
- * Return the next 'event id'. Though events don't have ids we can simulate them to
- * help with bookkeeping.
- */
- public long getNextEventId() {
- return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
- }
-
-
-
- /**
- * Generate and return a random bid with next available id.
- */
- private Bid nextBid(long eventId, Random random, long timestamp) {
- long auction;
- // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
- if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
- // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
- auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
- } else {
- auction = nextBase0AuctionId(eventId, random, config);
- }
- auction += GeneratorConfig.FIRST_AUCTION_ID;
-
- long bidder;
- // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
- if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
- // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
- // last HOT_BIDDER_RATIO people.
- bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
- } else {
- bidder = nextBase0PersonId(eventId, random, config);
- }
- bidder += GeneratorConfig.FIRST_PERSON_ID;
-
- long price = nextPrice(random);
- int currentSize = 8 + 8 + 8 + 8;
- String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
- return new Bid(auction, bidder, price, timestamp, extra);
- }
-
- @Override
- public boolean hasNext() {
- return eventsCountSoFar < config.maxEvents;
- }
-
- /**
- * Return the next event. The outer timestamp is in wallclock time and corresponds to
- * when the event should fire. The inner timestamp is in event-time and represents the
- * time the event is purported to have taken place in the simulation.
- */
- public NextEvent nextEvent() {
- if (wallclockBaseTime < 0) {
- wallclockBaseTime = System.currentTimeMillis();
- }
- // When, in event time, we should generate the event. Monotonic.
- long eventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(
- config.nextEventNumber(eventsCountSoFar)).getKey();
- // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
- // may have local jitter.
- long adjustedEventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(
- config.nextAdjustedEventNumber(eventsCountSoFar))
- .getKey();
- // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
- // the event timestamp.
- long watermark =
- config.timestampAndInterEventDelayUsForEvent(
- config.nextEventNumberForWatermark(eventsCountSoFar))
- .getKey();
- // When, in wallclock time, we should emit the event.
- long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
-
- // Seed the random number generator with the next 'event id'.
- Random random = new Random(getNextEventId());
-
-
- long newEventId = getNextEventId();
- long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-
- Event event;
- if (rem < GeneratorConfig.PERSON_PROPORTION) {
- event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
- } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- event = new Event(
- nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
- } else {
- event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
- }
-
- eventsCountSoFar++;
- return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
- }
-
- @Override
- public TimestampedValue<Event> next() {
- NextEvent next = nextEvent();
- return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Return how many microseconds till we emit the next event.
- */
- public long currentInterEventDelayUs() {
- return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
- .getValue();
- }
-
- /**
- * Return an estimate of fraction of output consumed.
- */
- public double getFractionConsumed() {
- return (double) eventsCountSoFar / config.maxEvents;
- }
-
- @Override
- public String toString() {
- return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
- eventsCountSoFar, wallclockBaseTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
deleted file mode 100644
index dfc135d..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-
-/**
- * Just enough state to be able to restore a generator back to where it was checkpointed.
- */
-public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- /** Coder for this class. */
- public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
- new CustomCoder<GeneratorCheckpoint>() {
- @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.numEvents, outStream);
- LONG_CODER.encode(value.wallclockBaseTime, outStream);
- }
-
- @Override
- public GeneratorCheckpoint decode(InputStream inStream)
- throws CoderException, IOException {
- long numEvents = LONG_CODER.decode(inStream);
- long wallclockBaseTime = LONG_CODER.decode(inStream);
- return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- private final long numEvents;
- private final long wallclockBaseTime;
-
- GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
- this.numEvents = numEvents;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- public Generator toGenerator(GeneratorConfig config) {
- return new Generator(config, numEvents, wallclockBaseTime);
- }
-
- @Override
- public void finalizeCheckpoint() throws IOException {
- // Nothing to finalize.
- }
-
- @Override
- public String toString() {
- return String.format("Generator.GeneratorCheckpoint{numEvents:%d;wallclockBaseTime:%d}",
- numEvents, wallclockBaseTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
deleted file mode 100644
index 8e0a899..0000000
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
- */
-public class GeneratorConfig implements Serializable {
-
- /**
- * We start the ids at specific values to help ensure the queries find a match even on
- * small synthesized dataset sizes.
- */
- public static final long FIRST_AUCTION_ID = 1000L;
- public static final long FIRST_PERSON_ID = 1000L;
- public static final long FIRST_CATEGORY_ID = 10L;
-
- /**
- * Proportions of people/auctions/bids to synthesize.
- */
- public static final int PERSON_PROPORTION = 1;
- public static final int AUCTION_PROPORTION = 3;
- private static final int BID_PROPORTION = 46;
- public static final int PROPORTION_DENOMINATOR =
- PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
-
- /**
- * Environment options.
- */
- public final NexmarkConfiguration configuration;
-
- /**
- * Delay between events, in microseconds. If the array has more than one entry then
- * the rate is changed every {@link #stepLengthSec}, and wraps around.
- */
- private final long[] interEventDelayUs;
-
- /**
- * Delay before changing the current inter-event delay.
- */
- private final long stepLengthSec;
-
- /**
- * Time for first event (ms since epoch).
- */
- public final long baseTime;
-
- /**
- * Event id of first event to be generated. Event ids are unique over all generators, and
- * are used as a seed to generate each event's data.
- */
- public final long firstEventId;
-
- /**
- * Maximum number of events to generate.
- */
- public final long maxEvents;
-
- /**
- * First event number. Generators running in parallel time may share the same event number,
- * and the event number is used to determine the event timestamp.
- */
- public final long firstEventNumber;
-
- /**
- * True period of epoch in milliseconds. Derived from above.
- * (Ie time to run through cycle for all interEventDelayUs entries).
- */
- private final long epochPeriodMs;
-
- /**
- * Number of events per epoch. Derived from above.
- * (Ie number of events to run through cycle for all interEventDelayUs entries).
- */
- private final long eventsPerEpoch;
-
- public GeneratorConfig(
- NexmarkConfiguration configuration, long baseTime, long firstEventId,
- long maxEventsOrZero, long firstEventNumber) {
- this.configuration = configuration;
- this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
- configuration.firstEventRate, configuration.nextEventRate,
- configuration.rateUnit, configuration.numEventGenerators);
- this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
- this.baseTime = baseTime;
- this.firstEventId = firstEventId;
- if (maxEventsOrZero == 0) {
- // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
- this.maxEvents =
- Long.MAX_VALUE / (PROPORTION_DENOMINATOR
- * Math.max(
- Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
- configuration.avgBidByteSize));
- } else {
- this.maxEvents = maxEventsOrZero;
- }
- this.firstEventNumber = firstEventNumber;
-
- long eventsPerEpoch = 0;
- long epochPeriodMs = 0;
- if (interEventDelayUs.length > 1) {
- for (long interEventDelayU : interEventDelayUs) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
- eventsPerEpoch += numEventsForThisCycle;
- epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
- }
- }
- this.eventsPerEpoch = eventsPerEpoch;
- this.epochPeriodMs = epochPeriodMs;
- }
-
- /**
- * Return a copy of this config.
- */
- public GeneratorConfig copy() {
- GeneratorConfig result;
- result = new GeneratorConfig(configuration, baseTime, firstEventId,
- maxEvents, firstEventNumber);
- return result;
- }
-
- /**
- * Split this config into {@code n} sub-configs with roughly equal number of
- * possible events, but distinct value spaces. The generators will run on parallel timelines.
- * This config should no longer be used.
- */
- public List<GeneratorConfig> split(int n) {
- List<GeneratorConfig> results = new ArrayList<>();
- if (n == 1) {
- // No split required.
- results.add(this);
- } else {
- long subMaxEvents = maxEvents / n;
- long subFirstEventId = firstEventId;
- for (int i = 0; i < n; i++) {
- if (i == n - 1) {
- // Don't loose any events to round-down.
- subMaxEvents = maxEvents - subMaxEvents * (n - 1);
- }
- results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
- subFirstEventId += subMaxEvents;
- }
- }
- return results;
- }
-
- /**
- * Return copy of this config except with given parameters.
- */
- public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
- return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
- }
-
- /**
- * Return an estimate of the bytes needed by {@code numEvents}.
- */
- public long estimatedBytesForEvents(long numEvents) {
- long numPersons =
- (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
- long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
- long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
- return numPersons * configuration.avgPersonByteSize
- + numAuctions * configuration.avgAuctionByteSize
- + numBids * configuration.avgBidByteSize;
- }
-
- public int getAvgPersonByteSize() {
- return configuration.avgPersonByteSize;
- }
-
- public int getNumActivePeople() {
- return configuration.numActivePeople;
- }
-
- public int getHotSellersRatio() {
- return configuration.hotSellersRatio;
- }
-
- public int getNumInFlightAuctions() {
- return configuration.numInFlightAuctions;
- }
-
- public int getHotAuctionRatio() {
- return configuration.hotAuctionRatio;
- }
-
- public int getHotBiddersRatio() {
- return configuration.hotBiddersRatio;
- }
-
- public int getAvgBidByteSize() {
- return configuration.avgBidByteSize;
- }
-
- public int getAvgAuctionByteSize() {
- return configuration.avgAuctionByteSize;
- }
-
- public double getProbDelayedEvent() {
- return configuration.probDelayedEvent;
- }
-
- public long getOccasionalDelaySec() {
- return configuration.occasionalDelaySec;
- }
-
- /**
- * Return an estimate of the byte-size of all events a generator for this config would yield.
- */
- public long getEstimatedSizeBytes() {
- return estimatedBytesForEvents(maxEvents);
- }
-
- /**
- * Return the first 'event id' which could be generated from this config. Though events don't
- * have ids we can simulate them to help bookkeeping.
- */
- public long getStartEventId() {
- return firstEventId + firstEventNumber;
- }
-
- /**
- * Return one past the last 'event id' which could be generated from this config.
- */
- public long getStopEventId() {
- return firstEventId + firstEventNumber + maxEvents;
- }
-
- /**
- * Return the next event number for a generator which has so far emitted {@code numEvents}.
- */
- public long nextEventNumber(long numEvents) {
- return firstEventNumber + numEvents;
- }
-
- /**
- * Return the next event number for a generator which has so far emitted {@code numEvents},
- * but adjusted to account for {@code outOfOrderGroupSize}.
- */
- public long nextAdjustedEventNumber(long numEvents) {
- long n = configuration.outOfOrderGroupSize;
- long eventNumber = nextEventNumber(numEvents);
- long base = (eventNumber / n) * n;
- long offset = (eventNumber * 953) % n;
- return base + offset;
- }
-
- /**
- * Return the event number who's event time will be a suitable watermark for
- * a generator which has so far emitted {@code numEvents}.
- */
- public long nextEventNumberForWatermark(long numEvents) {
- long n = configuration.outOfOrderGroupSize;
- long eventNumber = nextEventNumber(numEvents);
- return (eventNumber / n) * n;
- }
-
- /**
- * What timestamp should the event with {@code eventNumber} have for this generator? And
- * what inter-event delay (in microseconds) is current?
- */
- public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
- if (interEventDelayUs.length == 1) {
- long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
- return KV.of(timestamp, interEventDelayUs[0]);
- }
-
- long epoch = eventNumber / eventsPerEpoch;
- long n = eventNumber % eventsPerEpoch;
- long offsetInEpochMs = 0;
- for (long interEventDelayU : interEventDelayUs) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
- if (n < numEventsForThisCycle) {
- long offsetInCycleUs = n * interEventDelayU;
- long timestamp =
- baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
- return KV.of(timestamp, interEventDelayU);
- }
- n -= numEventsForThisCycle;
- offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
- }
- throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("GeneratorConfig");
- sb.append("{configuration:");
- sb.append(configuration.toString());
- sb.append(";interEventDelayUs=[");
- for (int i = 0; i < interEventDelayUs.length; i++) {
- if (i > 0) {
- sb.append(",");
- }
- sb.append(interEventDelayUs[i]);
- }
- sb.append("]");
- sb.append(";stepLengthSec:");
- sb.append(stepLengthSec);
- sb.append(";baseTime:");
- sb.append(baseTime);
- sb.append(";firstEventId:");
- sb.append(firstEventId);
- sb.append(";maxEvents:");
- sb.append(maxEvents);
- sb.append(";firstEventNumber:");
- sb.append(firstEventNumber);
- sb.append(";epochPeriodMs:");
- sb.append(epochPeriodMs);
- sb.append(";eventsPerEpoch:");
- sb.append(eventsPerEpoch);
- sb.append("}");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index 74eb061..f43486d 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -30,6 +30,9 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
new file mode 100644
index 0000000..bd736c1
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.nextAuction;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.BidGenerator.nextBid;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextPerson;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
+ * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
+ * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
+ *
+ * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
+ * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
+ * (in microseconds). The event stream is thus fully deterministic and does not depend on
+ * wallclock time.
+ *
+ * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
+ * so that we can resume generating events from a saved snapshot.
+ */
+public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
+
+
+ /**
+ * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
+ * (arbitrary but stable) event hash order.
+ */
+ public static class NextEvent implements Comparable<NextEvent> {
+ /** When, in wallclock time, should this event be emitted? */
+ public final long wallclockTimestamp;
+
+ /** When, in event time, should this event be considered to have occured? */
+ public final long eventTimestamp;
+
+ /** The event itself. */
+ public final Event event;
+
+ /** The minimum of this and all future event timestamps. */
+ public final long watermark;
+
+ public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
+ this.wallclockTimestamp = wallclockTimestamp;
+ this.eventTimestamp = eventTimestamp;
+ this.event = event;
+ this.watermark = watermark;
+ }
+
+ /**
+ * Return a deep copy of next event with delay added to wallclock timestamp and
+ * event annotate as 'LATE'.
+ */
+ public NextEvent withDelay(long delayMs) {
+ return new NextEvent(
+ wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NextEvent nextEvent = (NextEvent) o;
+
+ return (wallclockTimestamp == nextEvent.wallclockTimestamp
+ && eventTimestamp == nextEvent.eventTimestamp
+ && watermark == nextEvent.watermark
+ && event.equals(nextEvent.event));
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
+ }
+
+ @Override
+ public int compareTo(NextEvent other) {
+ int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
+ if (i != 0) {
+ return i;
+ }
+ return Integer.compare(event.hashCode(), other.event.hashCode());
+ }
+ }
+
+ /**
+ * Configuration to generate events against. Note that it may be replaced by a call to
+ * {@link #splitAtEventId}.
+ */
+ private GeneratorConfig config;
+
+ /** Number of events generated by this generator. */
+ private long eventsCountSoFar;
+
+ /**
+ * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
+ */
+ private long wallclockBaseTime;
+
+ Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) {
+ checkNotNull(config);
+ this.config = config;
+ this.eventsCountSoFar = eventsCountSoFar;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ /**
+ * Create a fresh generator according to {@code config}.
+ */
+ public Generator(GeneratorConfig config) {
+ this(config, 0, -1);
+ }
+
+ /**
+ * Return a checkpoint for the current generator.
+ */
+ public GeneratorCheckpoint toCheckpoint() {
+ return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime);
+ }
+
+ /**
+ * Return a deep copy of this generator.
+ */
+ public Generator copy() {
+ checkNotNull(config);
+ Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime);
+ return result;
+ }
+
+ /**
+ * Return the current config for this generator. Note that configs may be replaced by {@link
+ * #splitAtEventId}.
+ */
+ public GeneratorConfig getCurrentConfig() {
+ return config;
+ }
+
+ /**
+ * Mutate this generator so that it will only generate events up to but not including
+ * {@code eventId}. Return a config to represent the events this generator will no longer yield.
+ * The generators will run in on a serial timeline.
+ */
+ public GeneratorConfig splitAtEventId(long eventId) {
+ long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
+ GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
+ config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
+ config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+ return remainConfig;
+ }
+
+ /**
+ * Return the next 'event id'. Though events don't have ids we can simulate them to
+ * help with bookkeeping.
+ */
+ public long getNextEventId() {
+ return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return eventsCountSoFar < config.maxEvents;
+ }
+
+ /**
+ * Return the next event. The outer timestamp is in wallclock time and corresponds to
+ * when the event should fire. The inner timestamp is in event-time and represents the
+ * time the event is purported to have taken place in the simulation.
+ */
+ public NextEvent nextEvent() {
+ if (wallclockBaseTime < 0) {
+ wallclockBaseTime = System.currentTimeMillis();
+ }
+ // When, in event time, we should generate the event. Monotonic.
+ long eventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextEventNumber(eventsCountSoFar)).getKey();
+ // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
+ // may have local jitter.
+ long adjustedEventTimestamp =
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextAdjustedEventNumber(eventsCountSoFar))
+ .getKey();
+ // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
+ // the event timestamp.
+ long watermark =
+ config.timestampAndInterEventDelayUsForEvent(
+ config.nextEventNumberForWatermark(eventsCountSoFar))
+ .getKey();
+ // When, in wallclock time, we should emit the event.
+ long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
+
+ // Seed the random number generator with the next 'event id'.
+ Random random = new Random(getNextEventId());
+
+
+ long newEventId = getNextEventId();
+ long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+
+ Event event;
+ if (rem < GeneratorConfig.PERSON_PROPORTION) {
+ event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
+ } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ event = new Event(
+ nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
+ } else {
+ event = new Event(nextBid(newEventId, random, adjustedEventTimestamp, config));
+ }
+
+ eventsCountSoFar++;
+ return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
+ }
+
+ @Override
+ public TimestampedValue<Event> next() {
+ NextEvent next = nextEvent();
+ return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Return how many microseconds till we emit the next event.
+ */
+ public long currentInterEventDelayUs() {
+ return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar))
+ .getValue();
+ }
+
+ /**
+ * Return an estimate of fraction of output consumed.
+ */
+ public double getFractionConsumed() {
+ return (double) eventsCountSoFar / config.maxEvents;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config,
+ eventsCountSoFar, wallclockBaseTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
new file mode 100644
index 0000000..fa41739
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * Just enough state to be able to restore a generator back to where it was checkpointed.
+ */
+public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ /** Coder for this class. */
+ public static final Coder<GeneratorCheckpoint> CODER_INSTANCE =
+ new CustomCoder<GeneratorCheckpoint>() {
+ @Override public void encode(GeneratorCheckpoint value, OutputStream outStream)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.numEvents, outStream);
+ LONG_CODER.encode(value.wallclockBaseTime, outStream);
+ }
+
+ @Override
+ public GeneratorCheckpoint decode(InputStream inStream)
+ throws CoderException, IOException {
+ long numEvents = LONG_CODER.decode(inStream);
+ long wallclockBaseTime = LONG_CODER.decode(inStream);
+ return new GeneratorCheckpoint(numEvents, wallclockBaseTime);
+ }
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
+ };
+
+ private final long numEvents;
+ private final long wallclockBaseTime;
+
+ GeneratorCheckpoint(long numEvents, long wallclockBaseTime) {
+ this.numEvents = numEvents;
+ this.wallclockBaseTime = wallclockBaseTime;
+ }
+
+ public Generator toGenerator(GeneratorConfig config) {
+ return new Generator(config, numEvents, wallclockBaseTime);
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ // Nothing to finalize.
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("numEvents", numEvents)
+ .add("wallclockBaseTime", wallclockBaseTime)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
new file mode 100644
index 0000000..7c862fa
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
+ */
+public class GeneratorConfig implements Serializable {
+
+ /**
+ * We start the ids at specific values to help ensure the queries find a match even on
+ * small synthesized dataset sizes.
+ */
+ public static final long FIRST_AUCTION_ID = 1000L;
+ public static final long FIRST_PERSON_ID = 1000L;
+ public static final long FIRST_CATEGORY_ID = 10L;
+
+ /**
+ * Proportions of people/auctions/bids to synthesize.
+ */
+ public static final int PERSON_PROPORTION = 1;
+ public static final int AUCTION_PROPORTION = 3;
+ private static final int BID_PROPORTION = 46;
+ public static final int PROPORTION_DENOMINATOR =
+ PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
+
+ /**
+ * Environment options.
+ */
+ private final NexmarkConfiguration configuration;
+
+ /**
+ * Delay between events, in microseconds. If the array has more than one entry then
+ * the rate is changed every {@link #stepLengthSec}, and wraps around.
+ */
+ private final long[] interEventDelayUs;
+
+ /**
+ * Delay before changing the current inter-event delay.
+ */
+ private final long stepLengthSec;
+
+ /**
+ * Time for first event (ms since epoch).
+ */
+ public final long baseTime;
+
+ /**
+ * Event id of first event to be generated. Event ids are unique over all generators, and
+ * are used as a seed to generate each event's data.
+ */
+ public final long firstEventId;
+
+ /**
+ * Maximum number of events to generate.
+ */
+ public final long maxEvents;
+
+ /**
+ * First event number. Generators running in parallel time may share the same event number,
+ * and the event number is used to determine the event timestamp.
+ */
+ public final long firstEventNumber;
+
+ /**
+ * True period of epoch in milliseconds. Derived from above.
+ * (Ie time to run through cycle for all interEventDelayUs entries).
+ */
+ private final long epochPeriodMs;
+
+ /**
+ * Number of events per epoch. Derived from above.
+ * (Ie number of events to run through cycle for all interEventDelayUs entries).
+ */
+ private final long eventsPerEpoch;
+
+ public GeneratorConfig(
+ NexmarkConfiguration configuration, long baseTime, long firstEventId,
+ long maxEventsOrZero, long firstEventNumber) {
+ this.configuration = configuration;
+ this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
+ configuration.firstEventRate, configuration.nextEventRate,
+ configuration.rateUnit, configuration.numEventGenerators);
+ this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
+ this.baseTime = baseTime;
+ this.firstEventId = firstEventId;
+ if (maxEventsOrZero == 0) {
+ // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
+ this.maxEvents =
+ Long.MAX_VALUE / (PROPORTION_DENOMINATOR
+ * Math.max(
+ Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
+ configuration.avgBidByteSize));
+ } else {
+ this.maxEvents = maxEventsOrZero;
+ }
+ this.firstEventNumber = firstEventNumber;
+
+ long eventsPerEpoch = 0;
+ long epochPeriodMs = 0;
+ if (interEventDelayUs.length > 1) {
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+ eventsPerEpoch += numEventsForThisCycle;
+ epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+ }
+ }
+ this.eventsPerEpoch = eventsPerEpoch;
+ this.epochPeriodMs = epochPeriodMs;
+ }
+
+ /**
+ * Return a copy of this config.
+ */
+ public GeneratorConfig copy() {
+ GeneratorConfig result;
+ result = new GeneratorConfig(configuration, baseTime, firstEventId,
+ maxEvents, firstEventNumber);
+ return result;
+ }
+
+ /**
+ * Split this config into {@code n} sub-configs with roughly equal number of
+ * possible events, but distinct value spaces. The generators will run on parallel timelines.
+ * This config should no longer be used.
+ */
+ public List<GeneratorConfig> split(int n) {
+ List<GeneratorConfig> results = new ArrayList<>();
+ if (n == 1) {
+ // No split required.
+ results.add(this);
+ } else {
+ long subMaxEvents = maxEvents / n;
+ long subFirstEventId = firstEventId;
+ for (int i = 0; i < n; i++) {
+ if (i == n - 1) {
+ // Don't loose any events to round-down.
+ subMaxEvents = maxEvents - subMaxEvents * (n - 1);
+ }
+ results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
+ subFirstEventId += subMaxEvents;
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Return copy of this config except with given parameters.
+ */
+ public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
+ return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ }
+
+ /**
+ * Return an estimate of the bytes needed by {@code numEvents}.
+ */
+ public long estimatedBytesForEvents(long numEvents) {
+ long numPersons =
+ (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
+ long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
+ return numPersons * configuration.avgPersonByteSize
+ + numAuctions * configuration.avgAuctionByteSize
+ + numBids * configuration.avgBidByteSize;
+ }
+
+ public int getAvgPersonByteSize() {
+ return configuration.avgPersonByteSize;
+ }
+
+ public int getNumActivePeople() {
+ return configuration.numActivePeople;
+ }
+
+ public int getHotSellersRatio() {
+ return configuration.hotSellersRatio;
+ }
+
+ public int getNumInFlightAuctions() {
+ return configuration.numInFlightAuctions;
+ }
+
+ public int getHotAuctionRatio() {
+ return configuration.hotAuctionRatio;
+ }
+
+ public int getHotBiddersRatio() {
+ return configuration.hotBiddersRatio;
+ }
+
+ public int getAvgBidByteSize() {
+ return configuration.avgBidByteSize;
+ }
+
+ public int getAvgAuctionByteSize() {
+ return configuration.avgAuctionByteSize;
+ }
+
+ public double getProbDelayedEvent() {
+ return configuration.probDelayedEvent;
+ }
+
+ public long getOccasionalDelaySec() {
+ return configuration.occasionalDelaySec;
+ }
+
+ /**
+ * Return an estimate of the byte-size of all events a generator for this config would yield.
+ */
+ public long getEstimatedSizeBytes() {
+ return estimatedBytesForEvents(maxEvents);
+ }
+
+ /**
+ * Return the first 'event id' which could be generated from this config. Though events don't
+ * have ids we can simulate them to help bookkeeping.
+ */
+ public long getStartEventId() {
+ return firstEventId + firstEventNumber;
+ }
+
+ /**
+ * Return one past the last 'event id' which could be generated from this config.
+ */
+ public long getStopEventId() {
+ return firstEventId + firstEventNumber + maxEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumber(long numEvents) {
+ return firstEventNumber + numEvents;
+ }
+
+ /**
+ * Return the next event number for a generator which has so far emitted {@code numEvents},
+ * but adjusted to account for {@code outOfOrderGroupSize}.
+ */
+ public long nextAdjustedEventNumber(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ long base = (eventNumber / n) * n;
+ long offset = (eventNumber * 953) % n;
+ return base + offset;
+ }
+
+ /**
+ * Return the event number who's event time will be a suitable watermark for
+ * a generator which has so far emitted {@code numEvents}.
+ */
+ public long nextEventNumberForWatermark(long numEvents) {
+ long n = configuration.outOfOrderGroupSize;
+ long eventNumber = nextEventNumber(numEvents);
+ return (eventNumber / n) * n;
+ }
+
+ /**
+ * What timestamp should the event with {@code eventNumber} have for this generator? And
+ * what inter-event delay (in microseconds) is current?
+ */
+ public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
+ if (interEventDelayUs.length == 1) {
+ long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
+ return KV.of(timestamp, interEventDelayUs[0]);
+ }
+
+ long epoch = eventNumber / eventsPerEpoch;
+ long n = eventNumber % eventsPerEpoch;
+ long offsetInEpochMs = 0;
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
+ if (n < numEventsForThisCycle) {
+ long offsetInCycleUs = n * interEventDelayU;
+ long timestamp =
+ baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
+ return KV.of(timestamp, interEventDelayU);
+ }
+ n -= numEventsForThisCycle;
+ offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
+ }
+ throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("GeneratorConfig");
+ sb.append("{configuration:");
+ sb.append(configuration.toString());
+ sb.append(";interEventDelayUs=[");
+ for (int i = 0; i < interEventDelayUs.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(interEventDelayUs[i]);
+ }
+ sb.append("]");
+ sb.append(";stepLengthSec:");
+ sb.append(stepLengthSec);
+ sb.append(";baseTime:");
+ sb.append(baseTime);
+ sb.append(";firstEventId:");
+ sb.append(firstEventId);
+ sb.append(";maxEvents:");
+ sb.append(maxEvents);
+ sb.append(";firstEventNumber:");
+ sb.append(firstEventNumber);
+ sb.append(";epochPeriodMs:");
+ sb.append(epochPeriodMs);
+ sb.append(";eventsPerEpoch:");
+ sb.append(eventsPerEpoch);
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
new file mode 100644
index 0000000..41a81da
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PriceGenerator.nextPrice;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextString;
+
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * AuctionGenerator.
+ */
+public class AuctionGenerator {
+ /**
+ * Keep the number of categories small so the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final int NUM_CATEGORIES = 5;
+
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int AUCTION_ID_LEAD = 10;
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_SELLER_RATIO = 100;
+
+ /**
+ * Generate and return a random auction with next available id.
+ */
+ public static Auction nextAuction(
+ long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
+
+ long seller;
+ // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+ if (random.nextInt(config.getHotSellersRatio()) > 0) {
+ // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+ seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+ } else {
+ seller = nextBase0PersonId(eventId, random, config);
+ }
+ seller += GeneratorConfig.FIRST_PERSON_ID;
+
+ long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+ long initialBid = nextPrice(random);
+ long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config);
+ String name = nextString(random, 20);
+ String desc = nextString(random, 100);
+ long reserve = initialBid + nextPrice(random);
+ int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.getAvgAuctionByteSize());
+ return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
+ extra);
+ }
+
+ /**
+ * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+ * due to generate an auction.
+ */
+ public static long lastBase0AuctionId(long eventId) {
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset < GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate a person.
+ // Go back to the last auction in the last epoch.
+ epoch--;
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ // About to generate a bid.
+ // Go back to the last auction generated in this epoch.
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else {
+ // About to generate an auction.
+ offset -= GeneratorConfig.PERSON_PROPORTION;
+ }
+ return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+ }
+
+ /**
+ * Return a random auction id (base 0).
+ */
+ public static long nextBase0AuctionId(
+ long nextEventId, Random random, GeneratorConfig config) {
+
+ // Choose a random auction for any of those which are likely to still be in flight,
+ // plus a few 'leads'.
+ // Note that ideally we'd track non-expired auctions exactly, but that state
+ // is difficult to split.
+ long minAuction = Math.max(
+ lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
+ long maxAuction = lastBase0AuctionId(nextEventId);
+ return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+ }
+
+ /** Return a random time delay, in milliseconds, for length of auctions. */
+ private static long nextAuctionLengthMs(
+ long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) {
+
+ // What's our current event number?
+ long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
+ // How many events till we've generated numInFlightAuctions?
+ long numEventsForAuctions =
+ (config.getNumInFlightAuctions() * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // When will the auction numInFlightAuctions beyond now be generated?
+ long futureAuction = config
+ .timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+ .getKey();
+ // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+ // futureAuction - timestamp, numEventsForAuctions);
+ // Choose a length with average horizonMs.
+ long horizonMs = futureAuction - timestamp;
+ return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
new file mode 100644
index 0000000..cffe380
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.lastBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.AuctionGenerator.nextBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * Generates bids.
+ */
+public class BidGenerator {
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_AUCTION_RATIO = 100;
+ private static final int HOT_BIDDER_RATIO = 100;
+
+
+ /**
+ * Generate and return a random bid with next available id.
+ */
+ public static Bid nextBid(
+ long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long auction;
+ // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
+ if (random.nextInt(config.getHotAuctionRatio()) > 0) {
+ // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
+ auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
+ } else {
+ auction = nextBase0AuctionId(eventId, random, config);
+ }
+ auction += GeneratorConfig.FIRST_AUCTION_ID;
+
+ long bidder;
+ // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
+ if (random.nextInt(config.getHotBiddersRatio()) > 0) {
+ // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
+ // last HOT_BIDDER_RATIO people.
+ bidder = (lastBase0PersonId(eventId) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
+ } else {
+ bidder = nextBase0PersonId(eventId, random, config);
+ }
+ bidder += GeneratorConfig.FIRST_PERSON_ID;
+
+ long price = PriceGenerator.nextPrice(random);
+ int currentSize = 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.getAvgBidByteSize());
+ return new Bid(auction, bidder, price, timestamp, extra);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
new file mode 100644
index 0000000..ed9db84
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * LongGenerator.
+ */
+public class LongGenerator {
+
+ /** Return a random long from {@code [0, n)}. */
+ public static long nextLong(Random random, long n) {
+ if (n < Integer.MAX_VALUE) {
+ return random.nextInt((int) n);
+ } else {
+ // WARNING: Very skewed distribution! Bad!
+ return Math.abs(random.nextLong() % n);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
new file mode 100644
index 0000000..9f306ea
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import static org.apache.beam.sdk.nexmark.sources.generator.model.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.generator.model.StringsGenerator.nextString;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
+
+/**
+ * Generates people.
+ */
+public class PersonGenerator {
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int PERSON_ID_LEAD = 10;
+
+ /**
+ * Keep the number of states small so that the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
+
+ private static final List<String> US_CITIES =
+ Arrays.asList(
+ ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
+ .split(","));
+
+ private static final List<String> FIRST_NAMES =
+ Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
+
+ private static final List<String> LAST_NAMES =
+ Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
+
+
+ /**
+ * Generate and return a random person with next available id.
+ */
+ public static Person nextPerson(
+ long nextEventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
+ String name = nextPersonName(random);
+ String email = nextEmail(random);
+ String creditCard = nextCreditCard(random);
+ String city = nextUSCity(random);
+ String state = nextUSState(random);
+ int currentSize =
+ 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
+ String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize());
+ return new Person(id, name, email, creditCard, city, state, timestamp, extra);
+ }
+
+ /**
+ * Return a random person id (base 0).
+ */
+ public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) {
+ // Choose a random person from any of the 'active' people, plus a few 'leads'.
+ // By limiting to 'active' we ensure the density of bids or auctions per person
+ // does not decrease over time for long running jobs.
+ // By choosing a person id ahead of the last valid person id we will make
+ // newPerson and newAuction events appear to have been swapped in time.
+ long numPeople = lastBase0PersonId(eventId) + 1;
+ long activePeople = Math.min(numPeople, config.getNumActivePeople());
+ long n = nextLong(random, activePeople + PERSON_ID_LEAD);
+ return numPeople - activePeople + n;
+ }
+
+ /**
+ * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
+ * due to generate a person.
+ */
+ public static long lastBase0PersonId(long eventId) {
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset >= GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate an auction or bid.
+ // Go back to the last person generated in this epoch.
+ offset = GeneratorConfig.PERSON_PROPORTION - 1;
+ }
+ // About to generate a person.
+ return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
+ }
+
+
+ /** return a random US state. */
+ private static String nextUSState(Random random) {
+ return US_STATES.get(random.nextInt(US_STATES.size()));
+ }
+
+ /** Return a random US city. */
+ private static String nextUSCity(Random random) {
+ return US_CITIES.get(random.nextInt(US_CITIES.size()));
+ }
+
+ /** Return a random person name. */
+ private static String nextPersonName(Random random) {
+ return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
+ + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
+ }
+
+ /** Return a random email address. */
+ private static String nextEmail(Random random) {
+ return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
+ }
+
+ /** Return a random credit card number. */
+ private static String nextCreditCard(Random random) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ if (i > 0) {
+ sb.append(' ');
+ }
+ sb.append(String.format("%04d", random.nextInt(10000)));
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
new file mode 100644
index 0000000..912b16e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * Generates a random price.
+ */
+public class PriceGenerator {
+
+ /** Return a random price. */
+ public static long nextPrice(Random random) {
+ return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
new file mode 100644
index 0000000..c808560
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.generator.model;
+
+import java.util.Random;
+
+/**
+ * Generates strings which are used for different field in other model objects.
+ */
+public class StringsGenerator {
+
+ /** Smallest random string size. */
+ private static final int MIN_STRING_LENGTH = 3;
+
+ /** Return a random string of up to {@code maxLength}. */
+ public static String nextString(Random random, int maxLength) {
+ int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
+ StringBuilder sb = new StringBuilder();
+ while (len-- > 0) {
+ if (random.nextInt(13) == 0) {
+ sb.append(' ');
+ } else {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ /** Return a random string of exactly {@code length}. */
+ public static String nextExactString(Random random, int length) {
+ StringBuilder sb = new StringBuilder();
+ while (length-- > 0) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Return a random {@code string} such that {@code currentSize + string.length()} is on average
+ * {@code averageSize}.
+ */
+ public static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
+ if (currentSize > desiredAverageSize) {
+ return "";
+ }
+ desiredAverageSize -= currentSize;
+ int delta = (int) Math.round(desiredAverageSize * 0.2);
+ int minSize = desiredAverageSize - delta;
+ int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
+ return nextExactString(random, desiredSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
new file mode 100644
index 0000000..c15b5ed
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Model Generators.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator.model;
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
new file mode 100644
index 0000000..a7ffd25
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * org.apache.beam.sdk.nexmark.sources.generator.
+ */
+
+/**
+ * Events generation logic.
+ */
+package org.apache.beam.sdk.nexmark.sources.generator;
[3/6] beam git commit: [Nexmark] Extract AuctionGenerator,
PriceGenerator from Generator
Posted by ke...@apache.org.
[Nexmark] Extract AuctionGenerator, PriceGenerator from Generator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e895fc82
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e895fc82
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e895fc82
Branch: refs/heads/master
Commit: e895fc8294c581d4004006cd898a05300ac7be12
Parents: 7055e0f
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 15:12:53 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/nexmark/sources/Generator.java | 111 +-------------
.../nexmark/sources/utils/AuctionGenerator.java | 145 +++++++++++++++++++
.../nexmark/sources/utils/PriceGenerator.java | 32 ++++
3 files changed, 184 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
index 69d4579..68e6748 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
@@ -18,19 +18,20 @@
package org.apache.beam.sdk.nexmark.sources;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.lastBase0AuctionId;
+import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextAuction;
+import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextBase0AuctionId;
import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson;
+import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Objects;
import java.util.Random;
-import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -50,23 +51,12 @@ import org.joda.time.Instant;
* so that we can resume generating events from a saved snapshot.
*/
public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
- /**
- * Keep the number of categories small so the example queries will find results even with
- * a small batch of events.
- */
- private static final int NUM_CATEGORIES = 5;
-
- /**
- * Number of yet-to-be-created people and auction ids allowed.
- */
- private static final int AUCTION_ID_LEAD = 10;
/**
* Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
* over these values.
*/
private static final int HOT_AUCTION_RATIO = 100;
- private static final int HOT_SELLER_RATIO = 100;
private static final int HOT_BIDDER_RATIO = 100;
/**
@@ -206,94 +196,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
}
- /**
- * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
- * due to generate an auction.
- */
- private long lastBase0AuctionId(long eventId) {
- long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
- long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
- if (offset < GeneratorConfig.PERSON_PROPORTION) {
- // About to generate a person.
- // Go back to the last auction in the last epoch.
- epoch--;
- offset = GeneratorConfig.AUCTION_PROPORTION - 1;
- } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- // About to generate a bid.
- // Go back to the last auction generated in this epoch.
- offset = GeneratorConfig.AUCTION_PROPORTION - 1;
- } else {
- // About to generate an auction.
- offset -= GeneratorConfig.PERSON_PROPORTION;
- }
- return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
- }
- /** Return a random price. */
- private static long nextPrice(Random random) {
- return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
- }
-
- /** Return a random time delay, in milliseconds, for length of auctions. */
- private long nextAuctionLengthMs(Random random, long timestamp) {
- // What's our current event number?
- long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
- // How many events till we've generated numInFlightAuctions?
- long numEventsForAuctions =
- (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
- / GeneratorConfig.AUCTION_PROPORTION;
- // When will the auction numInFlightAuctions beyond now be generated?
- long futureAuction =
- config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
- .getKey();
- // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
- // futureAuction - timestamp, numEventsForAuctions);
- // Choose a length with average horizonMs.
- long horizonMs = futureAuction - timestamp;
- return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
- }
-
-
- /**
- * Return a random auction id (base 0).
- */
- private long nextBase0AuctionId(long nextEventId, Random random) {
- // Choose a random auction for any of those which are likely to still be in flight,
- // plus a few 'leads'.
- // Note that ideally we'd track non-expired auctions exactly, but that state
- // is difficult to split.
- long minAuction = Math.max(
- lastBase0AuctionId(nextEventId) - config.configuration.numInFlightAuctions, 0);
- long maxAuction = lastBase0AuctionId(nextEventId);
- return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
- }
-
- /**
- * Generate and return a random auction with next available id.
- */
- private Auction nextAuction(long eventId, Random random, long timestamp) {
- long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
-
- long seller;
- // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
- if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
- // Choose the first person in the batch of last HOT_SELLER_RATIO people.
- seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
- } else {
- seller = nextBase0PersonId(eventId, random, config);
- }
- seller += GeneratorConfig.FIRST_PERSON_ID;
-
- long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
- long initialBid = nextPrice(random);
- long expires = timestamp + nextAuctionLengthMs(random, timestamp);
- String name = nextString(random, 20);
- String desc = nextString(random, 100);
- long reserve = initialBid + nextPrice(random);
- int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
- String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
- return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
- extra);
- }
/**
* Generate and return a random bid with next available id.
@@ -305,7 +207,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
// Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
} else {
- auction = nextBase0AuctionId(eventId, random);
+ auction = nextBase0AuctionId(eventId, random, config);
}
auction += GeneratorConfig.FIRST_AUCTION_ID;
@@ -370,7 +272,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
if (rem < GeneratorConfig.PERSON_PROPORTION) {
event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config));
} else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp));
+ event = new Event(
+ nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config));
} else {
event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
new file mode 100644
index 0000000..90918d6
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
+import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
+import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
+import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
+import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
+
+import java.util.Random;
+
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+
+/**
+ * AuctionGenerator.
+ */
+public class AuctionGenerator {
+ /**
+ * Keep the number of categories small so the example queries will find results even with
+ * a small batch of events.
+ */
+ private static final int NUM_CATEGORIES = 5;
+
+ /**
+ * Number of yet-to-be-created people and auction ids allowed.
+ */
+ private static final int AUCTION_ID_LEAD = 10;
+
+ /**
+ * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
+ * over these values.
+ */
+ private static final int HOT_SELLER_RATIO = 100;
+
+ /**
+ * Generate and return a random auction with next available id.
+ */
+ public static Auction nextAuction(
+ long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) {
+
+ long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
+
+ long seller;
+ // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+ if (random.nextInt(config.getHotSellersRatio()) > 0) {
+ // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+ seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
+ } else {
+ seller = nextBase0PersonId(eventId, random, config);
+ }
+ seller += GeneratorConfig.FIRST_PERSON_ID;
+
+ long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
+ long initialBid = nextPrice(random);
+ long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config);
+ String name = nextString(random, 20);
+ String desc = nextString(random, 100);
+ long reserve = initialBid + nextPrice(random);
+ int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+ String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
+ return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
+ extra);
+ }
+
+ /**
+ * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
+ * due to generate an auction.
+ */
+ public static long lastBase0AuctionId(long eventId) {
+ long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+ long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+ if (offset < GeneratorConfig.PERSON_PROPORTION) {
+ // About to generate a person.
+ // Go back to the last auction in the last epoch.
+ epoch--;
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
+ // About to generate a bid.
+ // Go back to the last auction generated in this epoch.
+ offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+ } else {
+ // About to generate an auction.
+ offset -= GeneratorConfig.PERSON_PROPORTION;
+ }
+ return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+ }
+
+ /**
+ * Return a random auction id (base 0).
+ */
+ public static long nextBase0AuctionId(
+ long nextEventId, Random random, GeneratorConfig config) {
+
+ // Choose a random auction for any of those which are likely to still be in flight,
+ // plus a few 'leads'.
+ // Note that ideally we'd track non-expired auctions exactly, but that state
+ // is difficult to split.
+ long minAuction = Math.max(
+ lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
+ long maxAuction = lastBase0AuctionId(nextEventId);
+ return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
+ }
+
+ /** Return a random time delay, in milliseconds, for length of auctions. */
+ private static long nextAuctionLengthMs(
+ long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) {
+
+ // What's our current event number?
+ long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
+ // How many events till we've generated numInFlightAuctions?
+ long numEventsForAuctions =
+ (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // When will the auction numInFlightAuctions beyond now be generated?
+ long futureAuction =
+ config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
+ .getKey();
+ // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
+ // futureAuction - timestamp, numEventsForAuctions);
+ // Choose a length with average horizonMs.
+ long horizonMs = futureAuction - timestamp;
+ return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
new file mode 100644
index 0000000..9dae1ca
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import java.util.Random;
+
+/**
+ * Generates a random price.
+ */
+public class PriceGenerator {
+
+ /** Return a random price. */
+ public static long nextPrice(Random random) {
+ return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+ }
+}