You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:23 UTC
[15/55] [abbrv] beam git commit: Refactor classes into packages
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java
deleted file mode 100644
index 7adb1b2..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure
- * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have
- * valid auction and bidder ids which can be joined to already-generated Auction and Person events.
- *
- * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new
- * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs}
- * (in microseconds). The event stream is thus fully deterministic and does not depend on
- * wallclock time.
- *
- * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark}
- * so that we can resume generating events from a saved snapshot.
- */
-public class Generator implements Iterator<TimestampedValue<Event>>, Serializable {
- /**
- * Keep the number of categories small so the example queries will find results even with
- * a small batch of events.
- */
- private static final int NUM_CATEGORIES = 5;
-
- /** Smallest random string size. */
- private static final int MIN_STRING_LENGTH = 3;
-
- /**
- * Keep the number of states small so that the example queries will find results even with
- * a small batch of events.
- */
- private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
-
- private static final List<String> US_CITIES =
- Arrays.asList(
- ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
- .split(","));
-
- private static final List<String> FIRST_NAMES =
- Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
-
- private static final List<String> LAST_NAMES =
- Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
-
- /**
- * Number of yet-to-be-created people and auction ids allowed.
- */
- private static final int PERSON_ID_LEAD = 10;
- private static final int AUCTION_ID_LEAD = 10;
-
- /**
- * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1
- * over these values.
- */
- private static final int HOT_AUCTION_RATIO = 100;
- private static final int HOT_SELLER_RATIO = 100;
- private static final int HOT_BIDDER_RATIO = 100;
-
- /**
- * Just enough state to be able to restore a generator back to where it was checkpointed.
- */
- public static class Checkpoint implements UnboundedSource.CheckpointMark {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- /** Coder for this class. */
- public static final Coder<Checkpoint> CODER_INSTANCE =
- new AtomicCoder<Checkpoint>() {
- @Override
- public void encode(
- Checkpoint value,
- OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.numEvents, outStream, Context.NESTED);
- LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED);
- }
-
- @Override
- public Checkpoint decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long numEvents = LONG_CODER.decode(inStream, Context.NESTED);
- long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
- return new Checkpoint(numEvents, wallclockBaseTime);
- }
- };
-
- private long numEvents;
- private long wallclockBaseTime;
-
- private Checkpoint(long numEvents, long wallclockBaseTime) {
- this.numEvents = numEvents;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- public Generator toGenerator(GeneratorConfig config) {
- return new Generator(config, numEvents, wallclockBaseTime);
- }
-
- @Override
- public void finalizeCheckpoint() throws IOException {
- // Nothing to finalize.
- }
-
- @Override
- public String toString() {
- return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}",
- numEvents, wallclockBaseTime);
- }
- }
-
- /**
- * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then
- * (arbitrary but stable) event hash order.
- */
- public static class NextEvent implements Comparable<NextEvent> {
- /** When, in wallclock time, should this event be emitted? */
- public final long wallclockTimestamp;
-
- /** When, in event time, should this event be considered to have occured? */
- public final long eventTimestamp;
-
- /** The event itself. */
- public final Event event;
-
- /** The minimum of this and all future event timestamps. */
- public final long watermark;
-
- public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) {
- this.wallclockTimestamp = wallclockTimestamp;
- this.eventTimestamp = eventTimestamp;
- this.event = event;
- this.watermark = watermark;
- }
-
- /**
- * Return a deep clone of next event with delay added to wallclock timestamp and
- * event annotate as 'LATE'.
- */
- public NextEvent withDelay(long delayMs) {
- return new NextEvent(
- wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
- }
-
- @Override
- public int compareTo(NextEvent other) {
- int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
- if (i != 0) {
- return i;
- }
- return Integer.compare(event.hashCode(), other.event.hashCode());
- }
- }
-
- /**
- * Configuration to generate events against. Note that it may be replaced by a call to
- * {@link #splitAtEventId}.
- */
- private GeneratorConfig config;
-
- /** Number of events generated by this generator. */
- private long numEvents;
-
- /**
- * Wallclock time at which we emitted the first event (ms since epoch). Initially -1.
- */
- private long wallclockBaseTime;
-
- private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) {
- checkNotNull(config);
- this.config = config;
- this.numEvents = numEvents;
- this.wallclockBaseTime = wallclockBaseTime;
- }
-
- /**
- * Create a fresh generator according to {@code config}.
- */
- public Generator(GeneratorConfig config) {
- this(config, 0, -1);
- }
-
- /**
- * Return a checkpoint for the current generator.
- */
- public Checkpoint toCheckpoint() {
- return new Checkpoint(numEvents, wallclockBaseTime);
- }
-
- /**
- * Return a deep clone of this generator.
- */
- @Override
- public Generator clone() {
- return new Generator(config.clone(), numEvents, wallclockBaseTime);
- }
-
- /**
- * Return the current config for this generator. Note that configs may be replaced by {@link
- * #splitAtEventId}.
- */
- public GeneratorConfig getCurrentConfig() {
- return config;
- }
-
- /**
- * Mutate this generator so that it will only generate events up to but not including
- * {@code eventId}. Return a config to represent the events this generator will no longer yield.
- * The generators will run in on a serial timeline.
- */
- public GeneratorConfig splitAtEventId(long eventId) {
- long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
- GeneratorConfig remainConfig = config.cloneWith(config.firstEventId,
- config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
- config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
- return remainConfig;
- }
-
- /**
- * Return the next 'event id'. Though events don't have ids we can simulate them to
- * help with bookkeeping.
- */
- public long getNextEventId() {
- return config.firstEventId + config.nextAdjustedEventNumber(numEvents);
- }
-
- /**
- * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if
- * due to generate a person.
- */
- private long lastBase0PersonId() {
- long eventId = getNextEventId();
- long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
- long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
- if (offset >= GeneratorConfig.PERSON_PROPORTION) {
- // About to generate an auction or bid.
- // Go back to the last person generated in this epoch.
- offset = GeneratorConfig.PERSON_PROPORTION - 1;
- }
- // About to generate a person.
- return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
- }
-
- /**
- * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if
- * due to generate an auction.
- */
- private long lastBase0AuctionId() {
- long eventId = getNextEventId();
- long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
- long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
- if (offset < GeneratorConfig.PERSON_PROPORTION) {
- // About to generate a person.
- // Go back to the last auction in the last epoch.
- epoch--;
- offset = GeneratorConfig.AUCTION_PROPORTION - 1;
- } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- // About to generate a bid.
- // Go back to the last auction generated in this epoch.
- offset = GeneratorConfig.AUCTION_PROPORTION - 1;
- } else {
- // About to generate an auction.
- offset -= GeneratorConfig.PERSON_PROPORTION;
- }
- return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
- }
-
- /** return a random US state. */
- private static String nextUSState(Random random) {
- return US_STATES.get(random.nextInt(US_STATES.size()));
- }
-
- /** Return a random US city. */
- private static String nextUSCity(Random random) {
- return US_CITIES.get(random.nextInt(US_CITIES.size()));
- }
-
- /** Return a random person name. */
- private static String nextPersonName(Random random) {
- return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
- + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
- }
-
- /** Return a random string of up to {@code maxLength}. */
- private static String nextString(Random random, int maxLength) {
- int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH);
- StringBuilder sb = new StringBuilder();
- while (len-- > 0) {
- if (random.nextInt(13) == 0) {
- sb.append(' ');
- } else {
- sb.append((char) ('a' + random.nextInt(26)));
- }
- }
- return sb.toString().trim();
- }
-
- /** Return a random string of exactly {@code length}. */
- private static String nextExactString(Random random, int length) {
- StringBuilder sb = new StringBuilder();
- while (length-- > 0) {
- sb.append((char) ('a' + random.nextInt(26)));
- }
- return sb.toString();
- }
-
- /** Return a random email address. */
- private static String nextEmail(Random random) {
- return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
- }
-
- /** Return a random credit card number. */
- private static String nextCreditCard(Random random) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 4; i++) {
- if (i > 0) {
- sb.append(' ');
- }
- sb.append(String.format("%04d", random.nextInt(10000)));
- }
- return sb.toString();
- }
-
- /** Return a random price. */
- private static long nextPrice(Random random) {
- return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
- }
-
- /** Return a random time delay, in milliseconds, for length of auctions. */
- private long nextAuctionLengthMs(Random random, long timestamp) {
- // What's our current event number?
- long currentEventNumber = config.nextAdjustedEventNumber(numEvents);
- // How many events till we've generated numInFlightAuctions?
- long numEventsForAuctions =
- (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR)
- / GeneratorConfig.AUCTION_PROPORTION;
- // When will the auction numInFlightAuctions beyond now be generated?
- long futureAuction =
- config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions)
- .getKey();
- // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n",
- // futureAuction - timestamp, numEventsForAuctions);
- // Choose a length with average horizonMs.
- long horizonMs = futureAuction - timestamp;
- return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
- }
-
- /**
- * Return a random {@code string} such that {@code currentSize + string.length()} is on average
- * {@code averageSize}.
- */
- private static String nextExtra(Random random, int currentSize, int desiredAverageSize) {
- if (currentSize > desiredAverageSize) {
- return "";
- }
- desiredAverageSize -= currentSize;
- int delta = (int) Math.round(desiredAverageSize * 0.2);
- int minSize = desiredAverageSize - delta;
- int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
- return nextExactString(random, desiredSize);
- }
-
- /** Return a random long from {@code [0, n)}. */
- private static long nextLong(Random random, long n) {
- if (n < Integer.MAX_VALUE) {
- return random.nextInt((int) n);
- } else {
- // TODO: Very skewed distribution! Bad!
- return Math.abs(random.nextLong()) % n;
- }
- }
-
- /**
- * Generate and return a random person with next available id.
- */
- private Person nextPerson(Random random, long timestamp) {
- long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID;
- String name = nextPersonName(random);
- String email = nextEmail(random);
- String creditCard = nextCreditCard(random);
- String city = nextUSCity(random);
- String state = nextUSState(random);
- int currentSize =
- 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length();
- String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize);
- return new Person(id, name, email, creditCard, city, state, timestamp, extra);
- }
-
- /**
- * Return a random person id (base 0).
- */
- private long nextBase0PersonId(Random random) {
- // Choose a random person from any of the 'active' people, plus a few 'leads'.
- // By limiting to 'active' we ensure the density of bids or auctions per person
- // does not decrease over time for long running jobs.
- // By choosing a person id ahead of the last valid person id we will make
- // newPerson and newAuction events appear to have been swapped in time.
- long numPeople = lastBase0PersonId() + 1;
- long activePeople = Math.min(numPeople, config.configuration.numActivePeople);
- long n = nextLong(random, activePeople + PERSON_ID_LEAD);
- return numPeople - activePeople + n;
- }
-
- /**
- * Return a random auction id (base 0).
- */
- private long nextBase0AuctionId(Random random) {
- // Choose a random auction for any of those which are likely to still be in flight,
- // plus a few 'leads'.
- // Note that ideally we'd track non-expired auctions exactly, but that state
- // is difficult to split.
- long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0);
- long maxAuction = lastBase0AuctionId();
- return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD);
- }
-
- /**
- * Generate and return a random auction with next available id.
- */
- private Auction nextAuction(Random random, long timestamp) {
- long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID;
-
- long seller;
- // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
- if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
- // Choose the first person in the batch of last HOT_SELLER_RATIO people.
- seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO;
- } else {
- seller = nextBase0PersonId(random);
- }
- seller += GeneratorConfig.FIRST_PERSON_ID;
-
- long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
- long initialBid = nextPrice(random);
- long dateTime = timestamp;
- long expires = timestamp + nextAuctionLengthMs(random, timestamp);
- String name = nextString(random, 20);
- String desc = nextString(random, 100);
- long reserve = initialBid + nextPrice(random);
- int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
- String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
- return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category,
- extra);
- }
-
- /**
- * Generate and return a random bid with next available id.
- */
- private Bid nextBid(Random random, long timestamp) {
- long auction;
- // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio.
- if (random.nextInt(config.configuration.hotAuctionRatio) > 0) {
- // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions.
- auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO;
- } else {
- auction = nextBase0AuctionId(random);
- }
- auction += GeneratorConfig.FIRST_AUCTION_ID;
-
- long bidder;
- // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio
- if (random.nextInt(config.configuration.hotBiddersRatio) > 0) {
- // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of
- // last HOT_BIDDER_RATIO people.
- bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1;
- } else {
- bidder = nextBase0PersonId(random);
- }
- bidder += GeneratorConfig.FIRST_PERSON_ID;
-
- long price = nextPrice(random);
- int currentSize = 8 + 8 + 8 + 8;
- String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize);
- return new Bid(auction, bidder, price, timestamp, extra);
- }
-
- @Override
- public boolean hasNext() {
- return numEvents < config.maxEvents;
- }
-
- /**
- * Return the next event. The outer timestamp is in wallclock time and corresponds to
- * when the event should fire. The inner timestamp is in event-time and represents the
- * time the event is purported to have taken place in the simulation.
- */
- public NextEvent nextEvent() {
- if (wallclockBaseTime < 0) {
- wallclockBaseTime = System.currentTimeMillis();
- }
- // When, in event time, we should generate the event. Monotonic.
- long eventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey();
- // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize
- // may have local jitter.
- long adjustedEventTimestamp =
- config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents))
- .getKey();
- // The minimum of this and all future adjusted event timestamps. Accounts for jitter in
- // the event timestamp.
- long watermark =
- config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents))
- .getKey();
- // When, in wallclock time, we should emit the event.
- long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime);
-
- // Seed the random number generator with the next 'event id'.
- Random random = new Random(getNextEventId());
- long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR;
-
- Event event;
- if (rem < GeneratorConfig.PERSON_PROPORTION) {
- event = new Event(nextPerson(random, adjustedEventTimestamp));
- } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) {
- event = new Event(nextAuction(random, adjustedEventTimestamp));
- } else {
- event = new Event(nextBid(random, adjustedEventTimestamp));
- }
-
- numEvents++;
- return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark);
- }
-
- @Override
- public TimestampedValue<Event> next() {
- NextEvent next = nextEvent();
- return TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Return how many microseconds till we emit the next event.
- */
- public long currentInterEventDelayUs() {
- return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents))
- .getValue();
- }
-
- /**
- * Return an estimate of fraction of output consumed.
- */
- public double getFractionConsumed() {
- return (double) numEvents / config.maxEvents;
- }
-
- @Override
- public String toString() {
- return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config,
- numEvents, wallclockBaseTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
deleted file mode 100644
index dceff4f..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
- */
-class GeneratorConfig implements Serializable {
- /**
- * We start the ids at specific values to help ensure the queries find a match even on
- * small synthesized dataset sizes.
- */
- public static final long FIRST_AUCTION_ID = 1000L;
- public static final long FIRST_PERSON_ID = 1000L;
- public static final long FIRST_CATEGORY_ID = 10L;
-
- /**
- * Proportions of people/auctions/bids to synthesize.
- */
- public static final int PERSON_PROPORTION = 1;
- public static final int AUCTION_PROPORTION = 3;
- public static final int BID_PROPORTION = 46;
- public static final int PROPORTION_DENOMINATOR =
- PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
-
- /**
- * Environment options.
- */
- public final NexmarkConfiguration configuration;
-
- /**
- * Delay between events, in microseconds. If the array has more than one entry then
- * the rate is changed every {@link #stepLengthSec}, and wraps around.
- */
- public final long[] interEventDelayUs;
-
- /**
- * Delay before changing the current inter-event delay.
- */
- public final long stepLengthSec;
-
- /**
- * Time for first event (ms since epoch).
- */
- public final long baseTime;
-
- /**
- * Event id of first event to be generated. Event ids are unique over all generators, and
- * are used as a seed to generate each event's data.
- */
- public final long firstEventId;
-
- /**
- * Maximum number of events to generate.
- */
- public final long maxEvents;
-
- /**
- * First event number. Generators running in parallel time may share the same event number,
- * and the event number is used to determine the event timestamp.
- */
- public final long firstEventNumber;
-
- /**
- * True period of epoch in milliseconds. Derived from above.
- * (Ie time to run through cycle for all interEventDelayUs entries).
- */
- public final long epochPeriodMs;
-
- /**
- * Number of events per epoch. Derived from above.
- * (Ie number of events to run through cycle for all interEventDelayUs entries).
- */
- public final long eventsPerEpoch;
-
- public GeneratorConfig(
- NexmarkConfiguration configuration, long baseTime, long firstEventId,
- long maxEventsOrZero, long firstEventNumber) {
- this.configuration = configuration;
- this.interEventDelayUs = configuration.rateShape.interEventDelayUs(
- configuration.firstEventRate, configuration.nextEventRate,
- configuration.rateUnit, configuration.numEventGenerators);
- this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec);
- this.baseTime = baseTime;
- this.firstEventId = firstEventId;
- if (maxEventsOrZero == 0) {
- // Scale maximum down to avoid overflow in getEstimatedSizeBytes.
- this.maxEvents =
- Long.MAX_VALUE / (PROPORTION_DENOMINATOR
- * Math.max(
- Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize),
- configuration.avgBidByteSize));
- } else {
- this.maxEvents = maxEventsOrZero;
- }
- this.firstEventNumber = firstEventNumber;
-
- long eventsPerEpoch = 0;
- long epochPeriodMs = 0;
- if (interEventDelayUs.length > 1) {
- for (int i = 0; i < interEventDelayUs.length; i++) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
- eventsPerEpoch += numEventsForThisCycle;
- epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
- }
- }
- this.eventsPerEpoch = eventsPerEpoch;
- this.epochPeriodMs = epochPeriodMs;
- }
-
- /**
- * Return a clone of this config.
- */
- @Override
- public GeneratorConfig clone() {
- return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
- }
-
- /**
- * Return clone of this config except with given parameters.
- */
- public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) {
- return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
- }
-
- /**
- * Split this config into {@code n} sub-configs with roughly equal number of
- * possible events, but distinct value spaces. The generators will run on parallel timelines.
- * This config should no longer be used.
- */
- public List<GeneratorConfig> split(int n) {
- List<GeneratorConfig> results = new ArrayList<>();
- if (n == 1) {
- // No split required.
- results.add(this);
- } else {
- long subMaxEvents = maxEvents / n;
- long subFirstEventId = firstEventId;
- for (int i = 0; i < n; i++) {
- if (i == n - 1) {
- // Don't loose any events to round-down.
- subMaxEvents = maxEvents - subMaxEvents * (n - 1);
- }
- results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber));
- subFirstEventId += subMaxEvents;
- }
- }
- return results;
- }
-
- /**
- * Return an estimate of the bytes needed by {@code numEvents}.
- */
- public long estimatedBytesForEvents(long numEvents) {
- long numPersons =
- (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR;
- long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR;
- long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR;
- return numPersons * configuration.avgPersonByteSize
- + numAuctions * configuration.avgAuctionByteSize
- + numBids * configuration.avgBidByteSize;
- }
-
- /**
- * Return an estimate of the byte-size of all events a generator for this config would yield.
- */
- public long getEstimatedSizeBytes() {
- return estimatedBytesForEvents(maxEvents);
- }
-
- /**
- * Return the first 'event id' which could be generated from this config. Though events don't
- * have ids we can simulate them to help bookkeeping.
- */
- public long getStartEventId() {
- return firstEventId + firstEventNumber;
- }
-
- /**
- * Return one past the last 'event id' which could be generated from this config.
- */
- public long getStopEventId() {
- return firstEventId + firstEventNumber + maxEvents;
- }
-
- /**
- * Return the next event number for a generator which has so far emitted {@code numEvents}.
- */
- public long nextEventNumber(long numEvents) {
- return firstEventNumber + numEvents;
- }
-
- /**
- * Return the next event number for a generator which has so far emitted {@code numEvents},
- * but adjusted to account for {@code outOfOrderGroupSize}.
- */
- public long nextAdjustedEventNumber(long numEvents) {
- long n = configuration.outOfOrderGroupSize;
- long eventNumber = nextEventNumber(numEvents);
- long base = (eventNumber / n) * n;
- long offset = (eventNumber * 953) % n;
- return base + offset;
- }
-
- /**
- * Return the event number who's event time will be a suitable watermark for
- * a generator which has so far emitted {@code numEvents}.
- */
- public long nextEventNumberForWatermark(long numEvents) {
- long n = configuration.outOfOrderGroupSize;
- long eventNumber = nextEventNumber(numEvents);
- return (eventNumber / n) * n;
- }
-
- /**
- * What timestamp should the event with {@code eventNumber} have for this generator? And
- * what inter-event delay (in microseconds) is current?
- */
- public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) {
- if (interEventDelayUs.length == 1) {
- long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L;
- return KV.of(timestamp, interEventDelayUs[0]);
- }
-
- long epoch = eventNumber / eventsPerEpoch;
- long n = eventNumber % eventsPerEpoch;
- long offsetInEpochMs = 0;
- for (int i = 0; i < interEventDelayUs.length; i++) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
- if (n < numEventsForThisCycle) {
- long offsetInCycleUs = n * interEventDelayUs[i];
- long timestamp =
- baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
- return KV.of(timestamp, interEventDelayUs[i]);
- }
- n -= numEventsForThisCycle;
- offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
- }
- throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("GeneratorConfig");
- sb.append("{configuration:");
- sb.append(configuration.toString());
- sb.append(";interEventDelayUs=[");
- for (int i = 0; i < interEventDelayUs.length; i++) {
- if (i > 0) {
- sb.append(",");
- }
- sb.append(interEventDelayUs[i]);
- }
- sb.append("]");
- sb.append(";stepLengthSec:");
- sb.append(stepLengthSec);
- sb.append(";baseTime:");
- sb.append(baseTime);
- sb.append(";firstEventId:");
- sb.append(firstEventId);
- sb.append(";maxEvents:");
- sb.append(maxEvents);
- sb.append(";firstEventNumber:");
- sb.append(firstEventNumber);
- sb.append(";epochPeriodMs:");
- sb.append(epochPeriodMs);
- sb.append(";eventsPerEpoch:");
- sb.append(eventsPerEpoch);
- sb.append("}");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
deleted file mode 100644
index 21fa3f4..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result type of {@link Query8}.
- */
-public class IdNameReserve implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
- @Override
- public void encode(IdNameReserve value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- STRING_CODER.encode(value.name, outStream, Context.NESTED);
- LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
- }
-
- @Override
- public IdNameReserve decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- String name = STRING_CODER.decode(inStream, Context.NESTED);
- long reserve = LONG_CODER.decode(inStream, Context.NESTED);
- return new IdNameReserve(id, name, reserve);
- }
- };
-
- @JsonProperty
- public final long id;
-
- @JsonProperty
- public final String name;
-
- /** Reserve price in cents. */
- @JsonProperty
- public final long reserve;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private IdNameReserve() {
- id = 0;
- name = null;
- reserve = 0;
- }
-
- public IdNameReserve(long id, String name, long reserve) {
- this.id = id;
- this.name = name;
- this.reserve = reserve;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + name.length() + 1 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
deleted file mode 100644
index 2093c48..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Interface for elements which can quickly estimate their encoded byte size.
- */
-public interface KnownSize {
- long sizeInBytes();
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
index 02660bf..6370e41 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -19,6 +19,7 @@ package org.apache.beam.integration.nexmark;
import java.io.Serializable;
+import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
deleted file mode 100644
index fe4687b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query3}.
- */
-public class NameCityStateId implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
- @Override
- public void encode(NameCityStateId value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- STRING_CODER.encode(value.name, outStream, Context.NESTED);
- STRING_CODER.encode(value.city, outStream, Context.NESTED);
- STRING_CODER.encode(value.state, outStream, Context.NESTED);
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- }
-
- @Override
- public NameCityStateId decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- String name = STRING_CODER.decode(inStream, Context.NESTED);
- String city = STRING_CODER.decode(inStream, Context.NESTED);
- String state = STRING_CODER.decode(inStream, Context.NESTED);
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- return new NameCityStateId(name, city, state, id);
- }
- };
-
- @JsonProperty
- public final String name;
-
- @JsonProperty
- public final String city;
-
- @JsonProperty
- public final String state;
-
- @JsonProperty
- public final long id;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private NameCityStateId() {
- name = null;
- city = null;
- state = null;
- id = 0;
- }
-
- public NameCityStateId(String name, String city, String state, long id) {
- this.name = name;
- this.city = city;
- this.state = state;
- this.id = id;
- }
-
- @Override
- public long sizeInBytes() {
- return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java
deleted file mode 100644
index 4c2721e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Apex runner.
- */
-public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> {
- /**
- * Command line flags.
- */
- public interface NexmarkApexOptions extends Options, ApexPipelineOptions {
- }
-
- /**
- * Entry point.
- */
- public static void main(String[] args) {
- // Gather command line args, baseline, configurations, etc.
- NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkApexOptions.class);
- options.setRunner(ApexRunner.class);
- NexmarkApexRunner runner = new NexmarkApexRunner(options);
- new NexmarkApexDriver().runAll(options, runner);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
deleted file mode 100644
index 3b8993a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import javax.annotation.Nullable;
-
-/**
- * Run a query using the Apex runner.
- */
-public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> {
- @Override
- protected boolean isStreaming() {
- return options.isStreaming();
- }
-
- @Override
- protected int coresPerWorker() {
- return 4;
- }
-
- @Override
- protected int maxNumWorkers() {
- return 5;
- }
-
- @Override
- protected void invokeBuilderForPublishOnlyPipeline(
- PipelineBuilder builder) {
- builder.build(options);
- }
-
- @Override
- protected void waitForPublisherPreload() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @Nullable
- protected NexmarkPerf monitor(NexmarkQuery query) {
- return null;
- }
-
- public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) {
- super(options);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index 0943664..e2890ed 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -29,7 +29,7 @@ import java.util.Objects;
* programmatically. We only capture properties which may influence the resulting
* pipeline performance, as captured by {@link NexmarkPerf}.
*/
-class NexmarkConfiguration implements Serializable {
+public class NexmarkConfiguration implements Serializable {
public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
/** If {@literal true}, include additional debugging and monitoring stats. */
@@ -228,7 +228,7 @@ class NexmarkConfiguration implements Serializable {
/**
* Replace any properties of this configuration which have been supplied by the command line.
*/
- public void overrideFromOptions(Options options) {
+ public void overrideFromOptions(NexmarkOptions options) {
if (options.getDebug() != null) {
debug = options.getDebug();
}
@@ -511,8 +511,6 @@ class NexmarkConfiguration implements Serializable {
/**
* Parse an object from {@code string}.
- *
- * @throws IOException
*/
public static NexmarkConfiguration fromString(String string) {
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java
deleted file mode 100644
index 24fcc01..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.direct.DirectOptions;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * An implementation of the 'NEXMark queries' using the Direct Runner.
- */
-class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> {
- /**
- * Command line flags.
- */
- public interface NexmarkDirectOptions extends Options, DirectOptions {
- }
-
- /**
- * Entry point.
- */
- public static void main(String[] args) {
- NexmarkDirectOptions options =
- PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkDirectOptions.class);
- options.setRunner(DirectRunner.class);
- NexmarkDirectRunner runner = new NexmarkDirectRunner(options);
- new NexmarkDirectDriver().runAll(options, runner);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
deleted file mode 100644
index 0119bbc..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Run a single query using the Direct Runner.
- */
-class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> {
- public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) {
- super(options);
- }
-
- @Override
- protected boolean isStreaming() {
- return options.isStreaming();
- }
-
- @Override
- protected int coresPerWorker() {
- return 4;
- }
-
- @Override
- protected int maxNumWorkers() {
- return 1;
- }
-
- @Override
- protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
- throw new UnsupportedOperationException(
- "Cannot use --pubSubMode=COMBINED with DirectRunner");
- }
-
- /**
- * Monitor the progress of the publisher job. Return when it has been generating events for
- * at least {@code configuration.preloadSeconds}.
- */
- @Override
- protected void waitForPublisherPreload() {
- throw new UnsupportedOperationException(
- "Cannot use --pubSubMode=COMBINED with DirectRunner");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
index e6a7b0b..4714124 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
@@ -28,6 +28,9 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Person;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -48,7 +51,7 @@ import org.joda.time.Instant;
* <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
* http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
*/
-public class NexmarkDriver<OptionT extends Options> {
+public class NexmarkDriver<OptionT extends NexmarkOptions> {
/**
* Entry point.
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
deleted file mode 100644
index 61a7d29..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Flink runner.
- */
-public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> {
- /**
- * Command line flags.
- */
- public interface NexmarkFlinkOptions extends Options, FlinkPipelineOptions {
- }
-
- /**
- * Entry point.
- */
- public static void main(String[] args) {
- // Gather command line args, baseline, configurations, etc.
- NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkFlinkOptions.class);
- options.setRunner(FlinkRunner.class);
- NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options);
- new NexmarkFlinkDriver().runAll(options, runner);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
deleted file mode 100644
index 95ab1ad..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Run a query using the Flink runner.
- */
-public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> {
- @Override
- protected boolean isStreaming() {
- return options.isStreaming();
- }
-
- @Override
- protected int coresPerWorker() {
- return 4;
- }
-
- @Override
- protected int maxNumWorkers() {
- return 5;
- }
-
- @Override
- protected void invokeBuilderForPublishOnlyPipeline(
- PipelineBuilder builder) {
- builder.build(options);
- }
-
- @Override
- protected void waitForPublisherPreload() {
- throw new UnsupportedOperationException();
- }
-
- public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
- super(options);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
deleted file mode 100644
index 50c2a7c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * An implementation of the 'NEXMark queries' for Google Dataflow.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of streaming dataflow.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> {
- /**
- * Command line flags.
- */
- public interface NexmarkGoogleOptions extends Options, DataflowPipelineOptions {
-
- }
-
- /**
- * Entry point.
- */
- public static void main(String[] args) {
- // Gather command line args, baseline, configurations, etc.
- NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkGoogleOptions.class);
- options.setRunner(DataflowRunner.class);
- NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
- new NexmarkGoogleDriver().runAll(options, runner);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
deleted file mode 100644
index f4bfb1e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.sdk.PipelineResult;
-import org.joda.time.Duration;
-
-/**
- * Run a singe Nexmark query using a given configuration on Google Dataflow.
- */
-class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> {
-
- public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
- super(options);
- }
-
- @Override
- protected boolean isStreaming() {
- return options.isStreaming();
- }
-
- @Override
- protected int coresPerWorker() {
- String machineType = options.getWorkerMachineType();
- if (machineType == null || machineType.isEmpty()) {
- return 1;
- }
- String[] split = machineType.split("-");
- if (split.length != 3) {
- return 1;
- }
- try {
- return Integer.parseInt(split[2]);
- } catch (NumberFormatException ex) {
- return 1;
- }
- }
-
- @Override
- protected int maxNumWorkers() {
- return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
- }
-
- @Override
- protected String getJobId(PipelineResult job) {
- return ((DataflowPipelineJob) job).getJobId();
- }
-
- @Override
- protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
- String jobName = options.getJobName();
- String appName = options.getAppName();
- options.setJobName("p-" + jobName);
- options.setAppName("p-" + appName);
- int coresPerWorker = coresPerWorker();
- int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
- / coresPerWorker;
- options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
- options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
- publisherMonitor = new Monitor<Event>(queryName, "publisher");
- try {
- builder.build(options);
- } finally {
- options.setJobName(jobName);
- options.setAppName(appName);
- options.setMaxNumWorkers(options.getMaxNumWorkers());
- options.setNumWorkers(options.getNumWorkers());
- }
- }
-
- /**
- * Monitor the progress of the publisher job. Return when it has been generating events for
- * at least {@code configuration.preloadSeconds}.
- */
- @Override
- protected void waitForPublisherPreload() {
- checkNotNull(publisherMonitor);
- checkNotNull(publisherResult);
- if (!options.getMonitorJobs()) {
- return;
- }
- if (!(publisherResult instanceof DataflowPipelineJob)) {
- return;
- }
- if (configuration.preloadSeconds <= 0) {
- return;
- }
-
- NexmarkUtils.console("waiting for publisher to pre-load");
-
- DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
-
- long numEvents = 0;
- long startMsSinceEpoch = -1;
- long endMsSinceEpoch = -1;
- while (true) {
- PipelineResult.State state = job.getState();
- switch (state) {
- case UNKNOWN:
- // Keep waiting.
- NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
- break;
- case STOPPED:
- case DONE:
- case CANCELLED:
- case FAILED:
- case UPDATED:
- NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
- return;
- case RUNNING:
- numEvents = getLong(job, publisherMonitor.getElementCounter());
- if (startMsSinceEpoch < 0 && numEvents > 0) {
- startMsSinceEpoch = System.currentTimeMillis();
- endMsSinceEpoch = startMsSinceEpoch
- + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
- }
- if (endMsSinceEpoch < 0) {
- NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
- } else {
- long remainMs = endMsSinceEpoch - System.currentTimeMillis();
- if (remainMs > 0) {
- NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
- remainMs / 1000);
- } else {
- NexmarkUtils.console("publisher preloaded %d events", numEvents);
- return;
- }
- }
- break;
- }
-
- try {
- Thread.sleep(PERF_DELAY.getMillis());
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new RuntimeException("Interrupted: publisher still running.");
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
new file mode 100644
index 0000000..1be974f
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * Command line flags.
+ */
+public interface NexmarkOptions extends PubsubOptions {
+ @Description("Which suite to run. Default is to use command line arguments for one job.")
+ @Default.Enum("DEFAULT")
+ NexmarkSuite getSuite();
+
+ void setSuite(NexmarkSuite suite);
+
+ @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.")
+ @Default.Boolean(false)
+ boolean getMonitorJobs();
+
+ void setMonitorJobs(boolean monitorJobs);
+
+ @Description("Where the events come from.")
+ @Nullable
+ NexmarkUtils.SourceType getSourceType();
+
+ void setSourceType(NexmarkUtils.SourceType sourceType);
+
+ @Description("Prefix for input files if using avro input")
+ @Nullable
+ String getInputPath();
+
+ void setInputPath(String inputPath);
+
+ @Description("Where results go.")
+ @Nullable
+ NexmarkUtils.SinkType getSinkType();
+
+ void setSinkType(NexmarkUtils.SinkType sinkType);
+
+ @Description("Which mode to run in when source is PUBSUB.")
+ @Nullable
+ NexmarkUtils.PubSubMode getPubSubMode();
+
+ void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
+
+ @Description("Which query to run.")
+ @Nullable
+ Integer getQuery();
+
+ void setQuery(Integer query);
+
+ @Description("Prefix for output files if using text output for results or running Query 10.")
+ @Nullable
+ String getOutputPath();
+
+ void setOutputPath(String outputPath);
+
+ @Description("Base name of pubsub topic to publish to in streaming mode.")
+ @Nullable
+ @Default.String("nexmark")
+ String getPubsubTopic();
+
+ void setPubsubTopic(String pubsubTopic);
+
+ @Description("Base name of pubsub subscription to read from in streaming mode.")
+ @Nullable
+ @Default.String("nexmark")
+ String getPubsubSubscription();
+
+ void setPubsubSubscription(String pubsubSubscription);
+
+ @Description("Base name of BigQuery table name if using BigQuery output.")
+ @Nullable
+ @Default.String("nexmark")
+ String getBigQueryTable();
+
+ void setBigQueryTable(String bigQueryTable);
+
+ @Description("Approximate number of events to generate. "
+ + "Zero for effectively unlimited in streaming mode.")
+ @Nullable
+ Long getNumEvents();
+
+ void setNumEvents(Long numEvents);
+
+ @Description("Time in seconds to preload the subscription with data, at the initial input rate "
+ + "of the pipeline.")
+ @Nullable
+ Integer getPreloadSeconds();
+
+ void setPreloadSeconds(Integer preloadSeconds);
+
+ @Description("Number of unbounded sources to create events.")
+ @Nullable
+ Integer getNumEventGenerators();
+
+ void setNumEventGenerators(Integer numEventGenerators);
+
+ @Description("Shape of event rate curve.")
+ @Nullable
+ NexmarkUtils.RateShape getRateShape();
+
+ void setRateShape(NexmarkUtils.RateShape rateShape);
+
+ @Description("Initial overall event rate (in --rateUnit).")
+ @Nullable
+ Integer getFirstEventRate();
+
+ void setFirstEventRate(Integer firstEventRate);
+
+ @Description("Next overall event rate (in --rateUnit).")
+ @Nullable
+ Integer getNextEventRate();
+
+ void setNextEventRate(Integer nextEventRate);
+
+ @Description("Unit for rates.")
+ @Nullable
+ NexmarkUtils.RateUnit getRateUnit();
+
+ void setRateUnit(NexmarkUtils.RateUnit rateUnit);
+
+ @Description("Overall period of rate shape, in seconds.")
+ @Nullable
+ Integer getRatePeriodSec();
+
+ void setRatePeriodSec(Integer ratePeriodSec);
+
+ @Description("If true, relay events in real time in streaming mode.")
+ @Nullable
+ Boolean getIsRateLimited();
+
+ void setIsRateLimited(Boolean isRateLimited);
+
+ @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
+ + " time in the past so that multiple runs will see exactly the same event streams"
+ + " and should thus have exactly the same results.")
+ @Nullable
+ Boolean getUseWallclockEventTime();
+
+ void setUseWallclockEventTime(Boolean useWallclockEventTime);
+
+ @Description("Assert pipeline results match model results.")
+ @Nullable
+ boolean getAssertCorrectness();
+
+ void setAssertCorrectness(boolean assertCorrectness);
+
+ @Description("Log all input events.")
+ @Nullable
+ boolean getLogEvents();
+
+ void setLogEvents(boolean logEvents);
+
+ @Description("Log all query results.")
+ @Nullable
+ boolean getLogResults();
+
+ void setLogResults(boolean logResults);
+
+ @Description("Average size in bytes for a person record.")
+ @Nullable
+ Integer getAvgPersonByteSize();
+
+ void setAvgPersonByteSize(Integer avgPersonByteSize);
+
+ @Description("Average size in bytes for an auction record.")
+ @Nullable
+ Integer getAvgAuctionByteSize();
+
+ void setAvgAuctionByteSize(Integer avgAuctionByteSize);
+
+ @Description("Average size in bytes for a bid record.")
+ @Nullable
+ Integer getAvgBidByteSize();
+
+ void setAvgBidByteSize(Integer avgBidByteSize);
+
+ @Description("Ratio of bids for 'hot' auctions above the background.")
+ @Nullable
+ Integer getHotAuctionRatio();
+
+ void setHotAuctionRatio(Integer hotAuctionRatio);
+
+ @Description("Ratio of auctions for 'hot' sellers above the background.")
+ @Nullable
+ Integer getHotSellersRatio();
+
+ void setHotSellersRatio(Integer hotSellersRatio);
+
+ @Description("Ratio of auctions for 'hot' bidders above the background.")
+ @Nullable
+ Integer getHotBiddersRatio();
+
+ void setHotBiddersRatio(Integer hotBiddersRatio);
+
+ @Description("Window size in seconds.")
+ @Nullable
+ Long getWindowSizeSec();
+
+ void setWindowSizeSec(Long windowSizeSec);
+
+ @Description("Window period in seconds.")
+ @Nullable
+ Long getWindowPeriodSec();
+
+ void setWindowPeriodSec(Long windowPeriodSec);
+
+ @Description("If in streaming mode, the holdback for watermark in seconds.")
+ @Nullable
+ Long getWatermarkHoldbackSec();
+
+ void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
+
+ @Description("Roughly how many auctions should be in flight for each generator.")
+ @Nullable
+ Integer getNumInFlightAuctions();
+
+ void setNumInFlightAuctions(Integer numInFlightAuctions);
+
+
+ @Description("Maximum number of people to consider as active for placing auctions or bids.")
+ @Nullable
+ Integer getNumActivePeople();
+
+ void setNumActivePeople(Integer numActivePeople);
+
+ @Description("Filename of perf data to append to.")
+ @Nullable
+ String getPerfFilename();
+
+ void setPerfFilename(String perfFilename);
+
+ @Description("Filename of baseline perf data to read from.")
+ @Nullable
+ String getBaselineFilename();
+
+ void setBaselineFilename(String baselineFilename);
+
+ @Description("Filename of summary perf data to append to.")
+ @Nullable
+ String getSummaryFilename();
+
+ void setSummaryFilename(String summaryFilename);
+
+ @Description("Filename for javascript capturing all perf data and any baselines.")
+ @Nullable
+ String getJavascriptFilename();
+
+ void setJavascriptFilename(String javascriptFilename);
+
+ @Description("If true, don't run the actual query. Instead, calculate the distribution "
+ + "of number of query results per (event time) minute according to the query model.")
+ @Nullable
+ boolean getJustModelResultRate();
+
+ void setJustModelResultRate(boolean justModelResultRate);
+
+ @Description("Coder strategy to use.")
+ @Nullable
+ NexmarkUtils.CoderStrategy getCoderStrategy();
+
+ void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
+
+ @Description("Delay, in milliseconds, for each event. We will peg one core for this "
+ + "number of milliseconds to simulate CPU-bound computation.")
+ @Nullable
+ Long getCpuDelayMs();
+
+ void setCpuDelayMs(Long cpuDelayMs);
+
+ @Description("Extra data, in bytes, to save to persistent state for each event. "
+ + "This will force I/O all the way to durable storage to simulate an "
+ + "I/O-bound computation.")
+ @Nullable
+ Long getDiskBusyBytes();
+
+ void setDiskBusyBytes(Long diskBusyBytes);
+
+ @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
+ @Nullable
+ Integer getAuctionSkip();
+
+ void setAuctionSkip(Integer auctionSkip);
+
+ @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
+ @Nullable
+ Integer getFanout();
+
+ void setFanout(Integer fanout);
+
+ @Description("Length of occasional delay to impose on events (in seconds).")
+ @Nullable
+ Long getOccasionalDelaySec();
+
+ void setOccasionalDelaySec(Long occasionalDelaySec);
+
+ @Description("Probability that an event will be delayed by delayS.")
+ @Nullable
+ Double getProbDelayedEvent();
+
+ void setProbDelayedEvent(Double probDelayedEvent);
+
+ @Description("Maximum size of each log file (in events). For Query10 only.")
+ @Nullable
+ Integer getMaxLogEvents();
+
+ void setMaxLogEvents(Integer maxLogEvents);
+
+ @Description("How to derive names of resources.")
+ @Default.Enum("QUERY_AND_SALT")
+ NexmarkUtils.ResourceNameMode getResourceNameMode();
+
+ void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
+
+ @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
+ @Default.Boolean(true)
+ boolean getManageResources();
+
+ void setManageResources(boolean manageResources);
+
+ @Description("If true, use pub/sub publish time instead of event time.")
+ @Nullable
+ Boolean getUsePubsubPublishTime();
+
+ void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
+
+ @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
+ + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
+ @Nullable
+ Long getOutOfOrderGroupSize();
+
+ void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
+
+ @Description("If false, do not add the Monitor and Snoop transforms.")
+ @Nullable
+ Boolean getDebug();
+
+ void setDebug(Boolean value);
+
+ @Description("If set, cancel running pipelines after this long")
+ @Nullable
+ Long getRunningTimeMinutes();
+
+ void setRunningTimeMinutes(Long value);
+
+ @Description("If set and --monitorJobs is true, check that the system watermark is never more "
+ + "than this far behind real time")
+ @Nullable
+ Long getMaxSystemLagSeconds();
+
+ void setMaxSystemLagSeconds(Long value);
+
+ @Description("If set and --monitorJobs is true, check that the data watermark is never more "
+ + "than this far behind real time")
+ @Nullable
+ Long getMaxDataLagSeconds();
+
+ void setMaxDataLagSeconds(Long value);
+
+ @Description("Only start validating watermarks after this many seconds")
+ @Nullable
+ Long getWatermarkValidationDelaySeconds();
+
+ void setWatermarkValidationDelaySeconds(Long value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
index 37b6213..e7f59c8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
/**
* Summary of performance for a particular run of a configuration.
*/
-class NexmarkPerf {
+public class NexmarkPerf {
/**
* A sample of the number of events and number of results (if known) generated at
* a particular time.
@@ -177,8 +177,6 @@ class NexmarkPerf {
/**
* Parse a {@link NexmarkPerf} object from JSON {@code string}.
- *
- * @throws IOException
*/
public static NexmarkPerf fromString(String string) {
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
index 5ef4191..c268a3b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -18,7 +18,11 @@
package org.apache.beam.integration.nexmark;
import javax.annotation.Nullable;
-
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
@@ -29,7 +33,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
-
import org.joda.time.Instant;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
index f265e0d..b2b1826 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
@@ -17,11 +17,6 @@
*/
package org.apache.beam.integration.nexmark;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItems;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -30,10 +25,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.hamcrest.core.IsCollectionContaining;
+
import org.hamcrest.core.IsEqual;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -107,15 +103,18 @@ public abstract class NexmarkQueryModel implements Serializable {
/** Return assertion to use on results of pipeline for this query. */
public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
final Collection<String> expectedStrings = toCollection(simulator().results());
- final String[] expectedStringsArray = expectedStrings.toArray(new String[expectedStrings.size()]);
+ final String[] expectedStringsArray =
+ expectedStrings.toArray(new String[expectedStrings.size()]);
return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
@Override
public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
- Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
- Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings));
+ Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
+ Assert.assertThat("wrong pipeline output", actualStrings,
+ IsEqual.equalTo(expectedStrings));
//compare without order
-// Assert.assertThat("wrong pipeline output", actualStrings, IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
+// Assert.assertThat("wrong pipeline output", actualStrings,
+// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
return null;
}
};