You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:53 UTC
[45/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
new file mode 100644
index 0000000..550fbd2
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -0,0 +1,1157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
+import org.apache.beam.sdk.nexmark.queries.Query0;
+import org.apache.beam.sdk.nexmark.queries.Query0Model;
+import org.apache.beam.sdk.nexmark.queries.Query1;
+import org.apache.beam.sdk.nexmark.queries.Query10;
+import org.apache.beam.sdk.nexmark.queries.Query11;
+import org.apache.beam.sdk.nexmark.queries.Query12;
+import org.apache.beam.sdk.nexmark.queries.Query1Model;
+import org.apache.beam.sdk.nexmark.queries.Query2;
+import org.apache.beam.sdk.nexmark.queries.Query2Model;
+import org.apache.beam.sdk.nexmark.queries.Query3;
+import org.apache.beam.sdk.nexmark.queries.Query3Model;
+import org.apache.beam.sdk.nexmark.queries.Query4;
+import org.apache.beam.sdk.nexmark.queries.Query4Model;
+import org.apache.beam.sdk.nexmark.queries.Query5;
+import org.apache.beam.sdk.nexmark.queries.Query5Model;
+import org.apache.beam.sdk.nexmark.queries.Query6;
+import org.apache.beam.sdk.nexmark.queries.Query6Model;
+import org.apache.beam.sdk.nexmark.queries.Query7;
+import org.apache.beam.sdk.nexmark.queries.Query7Model;
+import org.apache.beam.sdk.nexmark.queries.Query8;
+import org.apache.beam.sdk.nexmark.queries.Query8Model;
+import org.apache.beam.sdk.nexmark.queries.Query9;
+import org.apache.beam.sdk.nexmark.queries.Query9Model;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Run a single Nexmark query using a given configuration.
+ */
+public class NexmarkLauncher<OptionT extends NexmarkOptions> {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
+ /**
+ * Minimum number of samples needed for 'stead-state' rate calculation.
+ */
+ private static final int MIN_SAMPLES = 9;
+ /**
+ * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
+ */
+ private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+ /**
+ * Delay between perf samples.
+ */
+ private static final Duration PERF_DELAY = Duration.standardSeconds(15);
+ /**
+ * How long to let streaming pipeline run after all events have been generated and we've
+ * seen no activity.
+ */
+ private static final Duration DONE_DELAY = Duration.standardMinutes(1);
+ /**
+ * How long to allow no activity without warning.
+ */
+ private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
+ /**
+ * How long to let streaming pipeline run after we've
+ * seen no activity, even if all events have not been generated.
+ */
+ private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
+ /**
+ * NexmarkOptions shared by all runs.
+ */
+ private final OptionT options;
+
+ /**
+ * Which configuration we are running.
+ */
+ @Nullable
+ private NexmarkConfiguration configuration;
+
+ /**
+ * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
+ */
+ @Nullable
+ private Monitor<Event> publisherMonitor;
+
+ /**
+ * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
+ */
+ @Nullable
+ private PipelineResult publisherResult;
+
+ /**
+ * Result for the main pipeline.
+ */
+ @Nullable
+ private PipelineResult mainResult;
+
+ /**
+ * Query name we are running.
+ */
+ @Nullable
+ private String queryName;
+
+ public NexmarkLauncher(OptionT options) {
+ this.options = options;
+ }
+
+
+ /**
+ * Is this query running in streaming mode?
+ */
+ private boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ /**
+ * Return maximum number of workers.
+ */
+ private int maxNumWorkers() {
+ return 5;
+ }
+
+ /**
+ * Return the current value for a long counter, or a default value if can't be retrieved.
+ * Note this uses only attempted metrics because some runners don't support committed metrics.
+ */
+ private long getCounterMetric(PipelineResult result, String namespace, String name,
+ long defaultValue) {
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+ Iterable<MetricResult<Long>> counters = metrics.counters();
+ try {
+ MetricResult<Long> metricResult = counters.iterator().next();
+ return metricResult.attempted();
+ } catch (NoSuchElementException e) {
+ LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
+ }
+ return defaultValue;
+ }
+
+ /**
+ * Return the current value for a long counter, or a default value if can't be retrieved.
+ * Note this uses only attempted metrics because some runners don't support committed metrics.
+ */
+ private long getDistributionMetric(PipelineResult result, String namespace, String name,
+ DistributionType distType, long defaultValue) {
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+ Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
+ try {
+ MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
+ switch (distType) {
+ case MIN:
+ return distributionResult.attempted().min();
+ case MAX:
+ return distributionResult.attempted().max();
+ default:
+ return defaultValue;
+ }
+ } catch (NoSuchElementException e) {
+ LOG.error(
+ "Failed to get distribution metric {} for namespace {}",
+ name,
+ namespace);
+ }
+ return defaultValue;
+ }
+
+ private enum DistributionType {MIN, MAX}
+
+ /**
+ * Return the current value for a time counter, or -1 if can't be retrieved.
+ */
+ private long getTimestampMetric(long now, long value) {
+ // timestamp metrics are used to monitor time of execution of transforms.
+ // If result timestamp metric is too far from now, consider that metric is erroneous
+
+ if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
+ return -1;
+ }
+ return value;
+ }
+
+ /**
+ * Find a 'steady state' events/sec from {@code snapshots} and
+ * store it in {@code perf} if found.
+ */
+ private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
+ if (!options.isStreaming()) {
+ return;
+ }
+
+ // Find the first sample with actual event and result counts.
+ int dataStart = 0;
+ for (; dataStart < snapshots.size(); dataStart++) {
+ if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
+ break;
+ }
+ }
+
+ // Find the last sample which demonstrated progress.
+ int dataEnd = snapshots.size() - 1;
+ for (; dataEnd > dataStart; dataEnd--) {
+ if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
+ break;
+ }
+ }
+
+ int numSamples = dataEnd - dataStart + 1;
+ if (numSamples < MIN_SAMPLES) {
+ // Not enough samples.
+ NexmarkUtils.console("%d samples not enough to calculate steady-state event rate",
+ numSamples);
+ return;
+ }
+
+ // We'll look at only the middle third samples.
+ int sampleStart = dataStart + numSamples / 3;
+ int sampleEnd = dataEnd - numSamples / 3;
+
+ double sampleSec =
+ snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
+ if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
+ // Not sampled over enough time.
+ NexmarkUtils.console(
+ "sample of %.1f sec not long enough to calculate steady-state event rate",
+ sampleSec);
+ return;
+ }
+
+ // Find rate with least squares error.
+ double sumxx = 0.0;
+ double sumxy = 0.0;
+ long prevNumEvents = -1;
+ for (int i = sampleStart; i <= sampleEnd; i++) {
+ if (prevNumEvents == snapshots.get(i).numEvents) {
+ // Skip samples with no change in number of events since they contribute no data.
+ continue;
+ }
+ // Use the effective runtime instead of wallclock time so we can
+ // insulate ourselves from delays and stutters in the query manager.
+ double x = snapshots.get(i).runtimeSec;
+ prevNumEvents = snapshots.get(i).numEvents;
+ double y = prevNumEvents;
+ sumxx += x * x;
+ sumxy += x * y;
+ }
+ double eventsPerSec = sumxy / sumxx;
+ NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
+ perf.eventsPerSec = eventsPerSec;
+ }
+
+ /**
+ * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
+ */
+ private NexmarkPerf currentPerf(
+ long startMsSinceEpoch, long now, PipelineResult result,
+ List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
+ Monitor<?> resultMonitor) {
+ NexmarkPerf perf = new NexmarkPerf();
+
+ long numEvents =
+ getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1);
+ long numEventBytes =
+ getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1);
+ long eventStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime",
+ DistributionType.MIN, -1));
+ long eventEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime",
+ DistributionType.MAX, -1));
+
+ long numResults =
+ getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1);
+ long numResultBytes =
+ getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1);
+ long resultStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime",
+ DistributionType.MIN, -1));
+ long resultEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime",
+ DistributionType.MAX, -1));
+ long timestampStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result,
+ resultMonitor.name, resultMonitor.prefix + ".startTimestamp",
+ DistributionType.MIN, -1));
+ long timestampEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result,
+ resultMonitor.name, resultMonitor.prefix + ".endTimestamp",
+ DistributionType.MAX, -1));
+
+ long effectiveEnd = -1;
+ if (eventEnd >= 0 && resultEnd >= 0) {
+ // It is possible for events to be generated after the last result was emitted.
+ // (Eg Query 2, which only yields results for a small prefix of the event stream.)
+ // So use the max of last event and last result times.
+ effectiveEnd = Math.max(eventEnd, resultEnd);
+ } else if (resultEnd >= 0) {
+ effectiveEnd = resultEnd;
+ } else if (eventEnd >= 0) {
+ // During startup we may have no result yet, but we would still like to track how
+ // long the pipeline has been running.
+ effectiveEnd = eventEnd;
+ }
+
+ if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
+ perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
+ }
+
+ if (numEvents >= 0) {
+ perf.numEvents = numEvents;
+ }
+
+ if (numEvents >= 0 && perf.runtimeSec > 0.0) {
+ // For streaming we may later replace this with a 'steady-state' value calculated
+ // from the progress snapshots.
+ perf.eventsPerSec = numEvents / perf.runtimeSec;
+ }
+
+ if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
+ perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
+ }
+
+ if (numResults >= 0) {
+ perf.numResults = numResults;
+ }
+
+ if (numResults >= 0 && perf.runtimeSec > 0.0) {
+ perf.resultsPerSec = numResults / perf.runtimeSec;
+ }
+
+ if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
+ perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
+ }
+
+ if (eventStart >= 0) {
+ perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
+ }
+
+ if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
+ perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
+ }
+
+ if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
+ double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
+ perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
+ }
+
+ if (resultEnd >= 0) {
+ // Fill in the shutdown delay assuming the job has now finished.
+ perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
+ }
+
+ // As soon as available, try to capture cumulative cost at this point too.
+
+ NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
+ snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
+ snapshot.runtimeSec = perf.runtimeSec;
+ snapshot.numEvents = numEvents;
+ snapshot.numResults = numResults;
+ snapshots.add(snapshot);
+
+ captureSteadyState(perf, snapshots);
+
+ return perf;
+ }
+
+ /**
+ * Build and run a pipeline using specified options.
+ */
+ interface PipelineBuilder<OptionT extends NexmarkOptions> {
+ void build(OptionT publishOnlyOptions);
+ }
+
+ /**
+ * Invoke the builder with options suitable for running a publish-only child pipeline.
+ */
+ private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
+ builder.build(options);
+ }
+
+ /**
+ * Monitor the performance and progress of a running job. Return final performance if
+ * it was measured.
+ */
+ @Nullable
+ private NexmarkPerf monitor(NexmarkQuery query) {
+ if (!options.getMonitorJobs()) {
+ return null;
+ }
+
+ if (configuration.debug) {
+ NexmarkUtils.console("Waiting for main pipeline to 'finish'");
+ } else {
+ NexmarkUtils.console("--debug=false, so job will not self-cancel");
+ }
+
+ PipelineResult job = mainResult;
+ PipelineResult publisherJob = publisherResult;
+ List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>();
+ long startMsSinceEpoch = System.currentTimeMillis();
+ long endMsSinceEpoch = -1;
+ if (options.getRunningTimeMinutes() != null) {
+ endMsSinceEpoch = startMsSinceEpoch
+ + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
+ - Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+ }
+ long lastActivityMsSinceEpoch = -1;
+ NexmarkPerf perf = null;
+ boolean waitingForShutdown = false;
+ boolean publisherCancelled = false;
+ List<String> errors = new ArrayList<>();
+
+ while (true) {
+ long now = System.currentTimeMillis();
+ if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
+ NexmarkUtils.console("Reached end of test, cancelling job");
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel main job: ", e);
+ }
+ if (publisherResult != null) {
+ try {
+ publisherJob.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel publisher job: ", e);
+ }
+ publisherCancelled = true;
+ }
+ waitingForShutdown = true;
+ }
+
+ PipelineResult.State state = job.getState();
+ NexmarkUtils.console("%s %s%s", state, queryName,
+ waitingForShutdown ? " (waiting for shutdown)" : "");
+
+ NexmarkPerf currPerf;
+ if (configuration.debug) {
+ currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots,
+ query.eventMonitor, query.resultMonitor);
+ } else {
+ currPerf = null;
+ }
+
+ if (perf == null || perf.anyActivity(currPerf)) {
+ lastActivityMsSinceEpoch = now;
+ }
+
+ if (options.isStreaming() && !waitingForShutdown) {
+ Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
+ long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
+ if (fatalCount > 0) {
+ NexmarkUtils.console("job has fatal errors, cancelling.");
+ errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
+ waitingForShutdown = true;
+ } else if (configuration.debug && configuration.numEvents > 0
+ && currPerf.numEvents == configuration.numEvents
+ && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) {
+ NexmarkUtils.console("streaming query appears to have finished, cancelling job.");
+ waitingForShutdown = true;
+ } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
+ NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.");
+ errors.add("Streaming job was cancelled since appeared stuck");
+ waitingForShutdown = true;
+ } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
+ NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.",
+ quietFor.getStandardMinutes());
+ errors.add(
+ String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
+ }
+
+ if (waitingForShutdown) {
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel main job: ", e);
+ }
+ }
+ }
+
+ perf = currPerf;
+
+ boolean running = true;
+ switch (state) {
+ case UNKNOWN:
+ case STOPPED:
+ case RUNNING:
+ // Keep going.
+ break;
+ case DONE:
+ // All done.
+ running = false;
+ break;
+ case CANCELLED:
+ running = false;
+ if (!waitingForShutdown) {
+ errors.add("Job was unexpectedly cancelled");
+ }
+ break;
+ case FAILED:
+ case UPDATED:
+ // Abnormal termination.
+ running = false;
+ errors.add("Job was unexpectedly updated");
+ break;
+ }
+
+ if (!running) {
+ break;
+ }
+
+ if (lastActivityMsSinceEpoch == now) {
+ NexmarkUtils.console("new perf %s", perf);
+ } else {
+ NexmarkUtils.console("no activity");
+ }
+
+ try {
+ Thread.sleep(PERF_DELAY.getMillis());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ NexmarkUtils.console("Interrupted: pipeline is still running");
+ }
+ }
+
+ perf.errors = errors;
+ perf.snapshots = snapshots;
+
+ if (publisherResult != null) {
+ NexmarkUtils.console("Shutting down publisher pipeline.");
+ try {
+ if (!publisherCancelled) {
+ publisherJob.cancel();
+ }
+ publisherJob.waitUntilFinish(Duration.standardMinutes(5));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel publisher job: ", e);
+ }
+ }
+
+ return perf;
+ }
+
+ // ================================================================================
+ // Basic sources and sinks
+ // ================================================================================
+
+ /**
+ * Return a topic name.
+ */
+ private String shortTopic(long now) {
+ String baseTopic = options.getPubsubTopic();
+ if (Strings.isNullOrEmpty(baseTopic)) {
+ throw new RuntimeException("Missing --pubsubTopic");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseTopic;
+ case QUERY:
+ return String.format("%s_%s_source", baseTopic, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s_%s_%d_source", baseTopic, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a subscription name.
+ */
+ private String shortSubscription(long now) {
+ String baseSubscription = options.getPubsubSubscription();
+ if (Strings.isNullOrEmpty(baseSubscription)) {
+ throw new RuntimeException("Missing --pubsubSubscription");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseSubscription;
+ case QUERY:
+ return String.format("%s_%s_source", baseSubscription, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s_%s_%d_source", baseSubscription, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a file name for plain text.
+ */
+ private String textFilename(long now) {
+ String baseFilename = options.getOutputPath();
+ if (Strings.isNullOrEmpty(baseFilename)) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseFilename;
+ case QUERY:
+ return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a BigQuery table spec.
+ */
+ private String tableSpec(long now, String version) {
+ String baseTableName = options.getBigQueryTable();
+ if (Strings.isNullOrEmpty(baseTableName)) {
+ throw new RuntimeException("Missing --bigQueryTable");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return String.format("%s:nexmark.%s_%s",
+ options.getProject(), baseTableName, version);
+ case QUERY:
+ return String.format("%s:nexmark.%s_%s_%s",
+ options.getProject(), baseTableName, queryName, version);
+ case QUERY_AND_SALT:
+ return String.format("%s:nexmark.%s_%s_%s_%d",
+ options.getProject(), baseTableName, queryName, version, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a directory for logs.
+ */
+ private String logsDir(long now) {
+ String baseFilename = options.getOutputPath();
+ if (Strings.isNullOrEmpty(baseFilename)) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseFilename;
+ case QUERY:
+ return String.format("%s/logs_%s", baseFilename, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a source of synthetic events.
+ */
+ private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
+ if (isStreaming()) {
+ NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
+ return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration));
+ } else {
+ NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
+ return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration));
+ }
+ }
+
+ /**
+ * Return source of events from Pubsub.
+ */
+ private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
+ String shortSubscription = shortSubscription(now);
+ NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
+
+ PubsubIO.Read<PubsubMessage> io =
+ PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+
+ return p
+ .apply(queryName + ".ReadPubsubEvents", io)
+ .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ byte[] payload = c.element().getPayload();
+ try {
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+ c.output(event);
+ } catch (CoderException e) {
+ LOG.error("Error while decoding Event from pusbSub message: serialization error");
+ }
+ }
+ }));
+ }
+
+ /**
+ * Return Avro source of events from {@code options.getInputFilePrefix}.
+ */
+ private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
+ String filename = options.getInputPath();
+ if (Strings.isNullOrEmpty(filename)) {
+ throw new RuntimeException("Missing --inputPath");
+ }
+ NexmarkUtils.console("Reading events from Avro files at %s", filename);
+ return p
+ .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class)
+ .from(filename + "*.avro"))
+ .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
+ }
+
+ /**
+ * Send {@code events} to Pubsub.
+ */
+ private void sinkEventsToPubsub(PCollection<Event> events, long now) {
+ String shortTopic = shortTopic(now);
+ NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
+
+ PubsubIO.Write<PubsubMessage> io =
+ PubsubIO.writeMessages().to(shortTopic)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+
+ events.apply(queryName + ".EventToPubsubMessage",
+ ParDo.of(new DoFn<Event, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ try {
+ byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(new PubsubMessage(payload, new HashMap<String, String>()));
+ } catch (CoderException e1) {
+ LOG.error("Error while sending Event {} to pusbSub: serialization error",
+ c.element().toString());
+ }
+ }
+ })
+ )
+ .apply(queryName + ".WritePubsubEvents", io);
+ }
+
+ /**
+ * Send {@code formattedResults} to Pubsub.
+ */
+ private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
+ String shortTopic = shortTopic(now);
+ NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
+ PubsubIO.Write<String> io =
+ PubsubIO.writeStrings().to(shortTopic)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+ formattedResults.apply(queryName + ".WritePubsubResults", io);
+ }
+
+ /**
+ * Sink all raw Events in {@code source} to {@code options.getOutputPath}.
+ * This will configure the job to write the following files:
+ * <ul>
+ * <li>{@code $outputPath/event*.avro} All Event entities.
+ * <li>{@code $outputPath/auction*.avro} Auction entities.
+ * <li>{@code $outputPath/bid*.avro} Bid entities.
+ * <li>{@code $outputPath/person*.avro} Person entities.
+ * </ul>
+ *
+ * @param source A PCollection of events.
+ */
+ private void sinkEventsToAvro(PCollection<Event> source) {
+ String filename = options.getOutputPath();
+ if (Strings.isNullOrEmpty(filename)) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ NexmarkUtils.console("Writing events to Avro files at %s", filename);
+ source.apply(queryName + ".WriteAvroEvents",
+ AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
+ source.apply(NexmarkQuery.JUST_BIDS)
+ .apply(queryName + ".WriteAvroBids",
+ AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
+ source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+ .apply(queryName + ".WriteAvroAuctions",
+ AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
+ source.apply(NexmarkQuery.JUST_NEW_PERSONS)
+ .apply(queryName + ".WriteAvroPeople",
+ AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
+ }
+
+ /**
+ * Send {@code formattedResults} to text files.
+ */
+ private void sinkResultsToText(PCollection<String> formattedResults, long now) {
+ String filename = textFilename(now);
+ NexmarkUtils.console("Writing results to text files at %s", filename);
+ formattedResults.apply(queryName + ".WriteTextResults",
+ TextIO.write().to(filename));
+ }
+
+ private static class StringToTableRow extends DoFn<String, TableRow> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ int n = ThreadLocalRandom.current().nextInt(10);
+ List<TableRow> records = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ records.add(new TableRow().set("index", i).set("value", Integer.toString(i)));
+ }
+ c.output(new TableRow().set("result", c.element()).set("records", records));
+ }
+ }
+
+ /**
+ * Send {@code formattedResults} to BigQuery.
+ */
+ private void sinkResultsToBigQuery(
+ PCollection<String> formattedResults, long now,
+ String version) {
+ String tableSpec = tableSpec(now, version);
+ TableSchema tableSchema =
+ new TableSchema().setFields(ImmutableList.of(
+ new TableFieldSchema().setName("result").setType("STRING"),
+ new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD")
+ .setFields(ImmutableList.of(
+ new TableFieldSchema().setName("index").setType("INTEGER"),
+ new TableFieldSchema().setName("value").setType("STRING")))));
+ NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
+ BigQueryIO.Write io =
+ BigQueryIO.write().to(tableSpec)
+ .withSchema(tableSchema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
+ formattedResults
+ .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow()))
+ .apply(queryName + ".WriteBigQueryResults", io);
+ }
+
+ // ================================================================================
+ // Construct overall pipeline
+ // ================================================================================
+
+ /**
+ * Return source of events for this run, or null if we are simply publishing events
+ * to Pubsub.
+ */
+ private PCollection<Event> createSource(Pipeline p, final long now) {
+ PCollection<Event> source = null;
+ switch (configuration.sourceType) {
+ case DIRECT:
+ source = sourceEventsFromSynthetic(p);
+ break;
+ case AVRO:
+ source = sourceEventsFromAvro(p);
+ break;
+ case PUBSUB:
+ // Setup the sink for the publisher.
+ switch (configuration.pubSubMode) {
+ case SUBSCRIBE_ONLY:
+ // Nothing to publish.
+ break;
+ case PUBLISH_ONLY:
+ // Send synthesized events to Pubsub in this job.
+ sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop",
+ NexmarkUtils.snoop(queryName)), now);
+ break;
+ case COMBINED:
+ // Send synthesized events to Pubsub in separate publisher job.
+ // We won't start the main pipeline until the publisher has sent the pre-load events.
+ // We'll shutdown the publisher job when we notice the main job has finished.
+ invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() {
+ @Override
+ public void build(NexmarkOptions publishOnlyOptions) {
+ Pipeline sp = Pipeline.create(options);
+ NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
+ publisherMonitor = new Monitor<>(queryName, "publisher");
+ sinkEventsToPubsub(
+ sourceEventsFromSynthetic(sp)
+ .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
+ now);
+ publisherResult = sp.run();
+ }
+ });
+ break;
+ }
+
+ // Setup the source for the consumer.
+ switch (configuration.pubSubMode) {
+ case PUBLISH_ONLY:
+ // Nothing to consume. Leave source null.
+ break;
+ case SUBSCRIBE_ONLY:
+ case COMBINED:
+ // Read events from pubsub.
+ source = sourceEventsFromPubsub(p, now);
+ break;
+ }
+ break;
+ }
+ return source;
+ }
+
+ private static final TupleTag<String> MAIN = new TupleTag<String>(){};
+ private static final TupleTag<String> SIDE = new TupleTag<String>(){};
+
+ private static class PartitionDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (c.element().hashCode() % 2 == 0) {
+ c.output(c.element());
+ } else {
+ c.output(SIDE, c.element());
+ }
+ }
+ }
+
+ /**
+ * Consume {@code results}.
+ */
+ private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
+ if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
+ // Avoid the cost of formatting the results.
+ results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
+ return;
+ }
+
+ PCollection<String> formattedResults =
+ results.apply(queryName + ".Format", NexmarkUtils.format(queryName));
+ if (options.getLogResults()) {
+ formattedResults = formattedResults.apply(queryName + ".Results.Log",
+ NexmarkUtils.<String>log(queryName + ".Results"));
+ }
+
+ switch (configuration.sinkType) {
+ case DEVNULL:
+ // Discard all results
+ formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
+ break;
+ case PUBSUB:
+ sinkResultsToPubsub(formattedResults, now);
+ break;
+ case TEXT:
+ sinkResultsToText(formattedResults, now);
+ break;
+ case AVRO:
+ NexmarkUtils.console(
+ "WARNING: with --sinkType=AVRO, actual query results will be discarded.");
+ break;
+ case BIGQUERY:
+ // Multiple BigQuery backends to mimic what most customers do.
+ PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
+ ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
+ sinkResultsToBigQuery(res.get(MAIN), now, "main");
+ sinkResultsToBigQuery(res.get(SIDE), now, "side");
+ sinkResultsToBigQuery(formattedResults, now, "copy");
+ break;
+ case COUNT_ONLY:
+ // Short-circuited above.
+ throw new RuntimeException();
+ }
+ }
+
+ // ================================================================================
+ // Entry point
+ // ================================================================================
+
+ /**
+ * Calculate the distribution of the expected rate of results per minute (in event time, not
+ * wallclock time).
+ */
+ private void modelResultRates(NexmarkQueryModel model) {
+ List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
+ Collections.sort(counts);
+ int n = counts.size();
+ if (n < 5) {
+ NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n);
+ } else {
+ NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
+ model.configuration.query, n, counts.get(0), counts.get(n / 4),
+ counts.get(n / 2),
+ counts.get(n - 1 - n / 4), counts.get(n - 1));
+ }
+ }
+
+ /**
+ * Run {@code configuration} and return its performance if possible.
+ */
+ @Nullable
+ public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
+ if (options.getManageResources() && !options.getMonitorJobs()) {
+ throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
+ }
+
+ //
+ // Setup per-run state.
+ //
+ checkState(configuration == null);
+ checkState(queryName == null);
+ configuration = runConfiguration;
+
+ try {
+ NexmarkUtils.console("Running %s", configuration.toShortString());
+
+ if (configuration.numEvents < 0) {
+ NexmarkUtils.console("skipping since configuration is disabled");
+ return null;
+ }
+
+ List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration),
+ new Query1(configuration),
+ new Query2(configuration),
+ new Query3(configuration),
+ new Query4(configuration),
+ new Query5(configuration),
+ new Query6(configuration),
+ new Query7(configuration),
+ new Query8(configuration),
+ new Query9(configuration),
+ new Query10(configuration),
+ new Query11(configuration),
+ new Query12(configuration));
+ NexmarkQuery query = queries.get(configuration.query);
+ queryName = query.getName();
+
+ List<NexmarkQueryModel> models = Arrays.asList(
+ new Query0Model(configuration),
+ new Query1Model(configuration),
+ new Query2Model(configuration),
+ new Query3Model(configuration),
+ new Query4Model(configuration),
+ new Query5Model(configuration),
+ new Query6Model(configuration),
+ new Query7Model(configuration),
+ new Query8Model(configuration),
+ new Query9Model(configuration),
+ null,
+ null,
+ null);
+ NexmarkQueryModel model = models.get(configuration.query);
+
+ if (options.getJustModelResultRate()) {
+ if (model == null) {
+ throw new RuntimeException(String.format("No model for %s", queryName));
+ }
+ modelResultRates(model);
+ return null;
+ }
+
+ long now = System.currentTimeMillis();
+ Pipeline p = Pipeline.create(options);
+ NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
+
+ // Generate events.
+ PCollection<Event> source = createSource(p, now);
+
+ if (options.getLogEvents()) {
+ source = source.apply(queryName + ".Events.Log",
+ NexmarkUtils.<Event>log(queryName + ".Events"));
+ }
+
+ // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY.
+ // In that case there's nothing more to add to pipeline.
+ if (source != null) {
+ // Optionally sink events in Avro format.
+ // (Query results are ignored).
+ if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
+ sinkEventsToAvro(source);
+ }
+
+ // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs,
+ // so, set parallelism. Also set the output path where to write log files.
+ if (configuration.query == 10) {
+ String path = null;
+ if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
+ path = logsDir(now);
+ }
+ ((Query10) query).setOutputPath(path);
+ ((Query10) query).setMaxNumWorkers(maxNumWorkers());
+ }
+
+ // Apply query.
+ PCollection<TimestampedValue<KnownSize>> results = source.apply(query);
+
+ if (options.getAssertCorrectness()) {
+ if (model == null) {
+ throw new RuntimeException(String.format("No model for %s", queryName));
+ }
+ // We know all our streams have a finite number of elements.
+ results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+ // If we have a finite number of events then assert our pipeline's
+ // results match those of a model using the same sequence of events.
+ PAssert.that(results).satisfies(model.assertionFor());
+ }
+
+ // Output results.
+ sink(results, now);
+ }
+
+ mainResult = p.run();
+ mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
+ return monitor(query);
+ } finally {
+ configuration = null;
+ queryName = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
new file mode 100644
index 0000000..2a2a5a7
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/**
+ * Command line flags.
+ */
+public interface NexmarkOptions
+ extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions {
+ @Description("Which suite to run. Default is to use command line arguments for one job.")
+ @Default.Enum("DEFAULT")
+ NexmarkSuite getSuite();
+
+ void setSuite(NexmarkSuite suite);
+
+ @Description("If true, monitor the jobs as they run.")
+ @Default.Boolean(false)
+ boolean getMonitorJobs();
+
+ void setMonitorJobs(boolean monitorJobs);
+
+ @Description("Where the events come from.")
+ @Nullable
+ NexmarkUtils.SourceType getSourceType();
+
+ void setSourceType(NexmarkUtils.SourceType sourceType);
+
+ @Description("Prefix for input files if using avro input")
+ @Nullable
+ String getInputPath();
+
+ void setInputPath(String inputPath);
+
+ @Description("Where results go.")
+ @Nullable
+ NexmarkUtils.SinkType getSinkType();
+
+ void setSinkType(NexmarkUtils.SinkType sinkType);
+
+ @Description("Which mode to run in when source is PUBSUB.")
+ @Nullable
+ NexmarkUtils.PubSubMode getPubSubMode();
+
+ void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
+
+ @Description("Which query to run.")
+ @Nullable
+ Integer getQuery();
+
+ void setQuery(Integer query);
+
+ @Description("Prefix for output files if using text output for results or running Query 10.")
+ @Nullable
+ String getOutputPath();
+
+ void setOutputPath(String outputPath);
+
+ @Description("Base name of pubsub topic to publish to in streaming mode.")
+ @Nullable
+ @Default.String("nexmark")
+ String getPubsubTopic();
+
+ void setPubsubTopic(String pubsubTopic);
+
+ @Description("Base name of pubsub subscription to read from in streaming mode.")
+ @Nullable
+ @Default.String("nexmark")
+ String getPubsubSubscription();
+
+ void setPubsubSubscription(String pubsubSubscription);
+
+ @Description("Base name of BigQuery table name if using BigQuery output.")
+ @Nullable
+ @Default.String("nexmark")
+ String getBigQueryTable();
+
+ void setBigQueryTable(String bigQueryTable);
+
+ @Description("Approximate number of events to generate. "
+ + "Zero for effectively unlimited in streaming mode.")
+ @Nullable
+ Long getNumEvents();
+
+ void setNumEvents(Long numEvents);
+
+ @Description("Time in seconds to preload the subscription with data, at the initial input rate "
+ + "of the pipeline.")
+ @Nullable
+ Integer getPreloadSeconds();
+
+ void setPreloadSeconds(Integer preloadSeconds);
+
+ @Description(
+ "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode")
+ @Nullable
+ Integer getStreamTimeout();
+
+ void setStreamTimeout(Integer streamTimeout);
+
+ @Description("Number of unbounded sources to create events.")
+ @Nullable
+ Integer getNumEventGenerators();
+
+ void setNumEventGenerators(Integer numEventGenerators);
+
+ @Description("Shape of event rate curve.")
+ @Nullable
+ NexmarkUtils.RateShape getRateShape();
+
+ void setRateShape(NexmarkUtils.RateShape rateShape);
+
+ @Description("Initial overall event rate (in --rateUnit).")
+ @Nullable
+ Integer getFirstEventRate();
+
+ void setFirstEventRate(Integer firstEventRate);
+
+ @Description("Next overall event rate (in --rateUnit).")
+ @Nullable
+ Integer getNextEventRate();
+
+ void setNextEventRate(Integer nextEventRate);
+
+ @Description("Unit for rates.")
+ @Nullable
+ NexmarkUtils.RateUnit getRateUnit();
+
+ void setRateUnit(NexmarkUtils.RateUnit rateUnit);
+
+ @Description("Overall period of rate shape, in seconds.")
+ @Nullable
+ Integer getRatePeriodSec();
+
+ void setRatePeriodSec(Integer ratePeriodSec);
+
+ @Description("If true, relay events in real time in streaming mode.")
+ @Nullable
+ Boolean getIsRateLimited();
+
+ void setIsRateLimited(Boolean isRateLimited);
+
+ @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
+ + " time in the past so that multiple runs will see exactly the same event streams"
+ + " and should thus have exactly the same results.")
+ @Nullable
+ Boolean getUseWallclockEventTime();
+
+ void setUseWallclockEventTime(Boolean useWallclockEventTime);
+
+ @Description("Assert pipeline results match model results.")
+ @Nullable
+ boolean getAssertCorrectness();
+
+ void setAssertCorrectness(boolean assertCorrectness);
+
+ @Description("Log all input events.")
+ @Nullable
+ boolean getLogEvents();
+
+ void setLogEvents(boolean logEvents);
+
+ @Description("Log all query results.")
+ @Nullable
+ boolean getLogResults();
+
+ void setLogResults(boolean logResults);
+
+ @Description("Average size in bytes for a person record.")
+ @Nullable
+ Integer getAvgPersonByteSize();
+
+ void setAvgPersonByteSize(Integer avgPersonByteSize);
+
+ @Description("Average size in bytes for an auction record.")
+ @Nullable
+ Integer getAvgAuctionByteSize();
+
+ void setAvgAuctionByteSize(Integer avgAuctionByteSize);
+
+ @Description("Average size in bytes for a bid record.")
+ @Nullable
+ Integer getAvgBidByteSize();
+
+ void setAvgBidByteSize(Integer avgBidByteSize);
+
+ @Description("Ratio of bids for 'hot' auctions above the background.")
+ @Nullable
+ Integer getHotAuctionRatio();
+
+ void setHotAuctionRatio(Integer hotAuctionRatio);
+
+ @Description("Ratio of auctions for 'hot' sellers above the background.")
+ @Nullable
+ Integer getHotSellersRatio();
+
+ void setHotSellersRatio(Integer hotSellersRatio);
+
+ @Description("Ratio of auctions for 'hot' bidders above the background.")
+ @Nullable
+ Integer getHotBiddersRatio();
+
+ void setHotBiddersRatio(Integer hotBiddersRatio);
+
+ @Description("Window size in seconds.")
+ @Nullable
+ Long getWindowSizeSec();
+
+ void setWindowSizeSec(Long windowSizeSec);
+
+ @Description("Window period in seconds.")
+ @Nullable
+ Long getWindowPeriodSec();
+
+ void setWindowPeriodSec(Long windowPeriodSec);
+
+ @Description("If in streaming mode, the holdback for watermark in seconds.")
+ @Nullable
+ Long getWatermarkHoldbackSec();
+
+ void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
+
+ @Description("Roughly how many auctions should be in flight for each generator.")
+ @Nullable
+ Integer getNumInFlightAuctions();
+
+ void setNumInFlightAuctions(Integer numInFlightAuctions);
+
+
+ @Description("Maximum number of people to consider as active for placing auctions or bids.")
+ @Nullable
+ Integer getNumActivePeople();
+
+ void setNumActivePeople(Integer numActivePeople);
+
+ @Description("Filename of perf data to append to.")
+ @Nullable
+ String getPerfFilename();
+
+ void setPerfFilename(String perfFilename);
+
+ @Description("Filename of baseline perf data to read from.")
+ @Nullable
+ String getBaselineFilename();
+
+ void setBaselineFilename(String baselineFilename);
+
+ @Description("Filename of summary perf data to append to.")
+ @Nullable
+ String getSummaryFilename();
+
+ void setSummaryFilename(String summaryFilename);
+
+ @Description("Filename for javascript capturing all perf data and any baselines.")
+ @Nullable
+ String getJavascriptFilename();
+
+ void setJavascriptFilename(String javascriptFilename);
+
+ @Description("If true, don't run the actual query. Instead, calculate the distribution "
+ + "of number of query results per (event time) minute according to the query model.")
+ @Nullable
+ boolean getJustModelResultRate();
+
+ void setJustModelResultRate(boolean justModelResultRate);
+
+ @Description("Coder strategy to use.")
+ @Nullable
+ NexmarkUtils.CoderStrategy getCoderStrategy();
+
+ void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
+
+ @Description("Delay, in milliseconds, for each event. We will peg one core for this "
+ + "number of milliseconds to simulate CPU-bound computation.")
+ @Nullable
+ Long getCpuDelayMs();
+
+ void setCpuDelayMs(Long cpuDelayMs);
+
+ @Description("Extra data, in bytes, to save to persistent state for each event. "
+ + "This will force I/O all the way to durable storage to simulate an "
+ + "I/O-bound computation.")
+ @Nullable
+ Long getDiskBusyBytes();
+
+ void setDiskBusyBytes(Long diskBusyBytes);
+
+ @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
+ @Nullable
+ Integer getAuctionSkip();
+
+ void setAuctionSkip(Integer auctionSkip);
+
+ @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
+ @Nullable
+ Integer getFanout();
+
+ void setFanout(Integer fanout);
+
+ @Description("Maximum waiting time to clean personState in query3 "
+ + "(ie maximum waiting of the auctions related to person in state in seconds in event time).")
+ @Nullable
+ Integer getMaxAuctionsWaitingTime();
+
+ void setMaxAuctionsWaitingTime(Integer fanout);
+
+ @Description("Length of occasional delay to impose on events (in seconds).")
+ @Nullable
+ Long getOccasionalDelaySec();
+
+ void setOccasionalDelaySec(Long occasionalDelaySec);
+
+ @Description("Probability that an event will be delayed by delayS.")
+ @Nullable
+ Double getProbDelayedEvent();
+
+ void setProbDelayedEvent(Double probDelayedEvent);
+
+ @Description("Maximum size of each log file (in events). For Query10 only.")
+ @Nullable
+ Integer getMaxLogEvents();
+
+ void setMaxLogEvents(Integer maxLogEvents);
+
+ @Description("How to derive names of resources.")
+ @Default.Enum("QUERY_AND_SALT")
+ NexmarkUtils.ResourceNameMode getResourceNameMode();
+
+ void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
+
+ @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
+ @Default.Boolean(true)
+ boolean getManageResources();
+
+ void setManageResources(boolean manageResources);
+
+ @Description("If true, use pub/sub publish time instead of event time.")
+ @Nullable
+ Boolean getUsePubsubPublishTime();
+
+ void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
+
+ @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
+ + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
+ @Nullable
+ Long getOutOfOrderGroupSize();
+
+ void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
+
+ @Description("If false, do not add the Monitor and Snoop transforms.")
+ @Nullable
+ Boolean getDebug();
+
+ void setDebug(Boolean value);
+
+ @Description("If set, cancel running pipelines after this long")
+ @Nullable
+ Long getRunningTimeMinutes();
+
+ void setRunningTimeMinutes(Long value);
+
+ @Description("If set and --monitorJobs is true, check that the system watermark is never more "
+ + "than this far behind real time")
+ @Nullable
+ Long getMaxSystemLagSeconds();
+
+ void setMaxSystemLagSeconds(Long value);
+
+ @Description("If set and --monitorJobs is true, check that the data watermark is never more "
+ + "than this far behind real time")
+ @Nullable
+ Long getMaxDataLagSeconds();
+
+ void setMaxDataLagSeconds(Long value);
+
+ @Description("Only start validating watermarks after this many seconds")
+ @Nullable
+ Long getWatermarkValidationDelaySeconds();
+
+ void setWatermarkValidationDelaySeconds(Long value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
new file mode 100644
index 0000000..2edf4e8
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Summary of performance for a particular run of a configuration.
+ */
+public class NexmarkPerf {
+ /**
+ * A sample of the number of events and number of results (if known) generated at
+ * a particular time.
+ */
+ public static class ProgressSnapshot {
+ /** Seconds since job was started (in wallclock time). */
+ @JsonProperty
+ double secSinceStart;
+
+ /** Job runtime in seconds (time from first event to last generated event or output result). */
+ @JsonProperty
+ double runtimeSec;
+
+ /** Cumulative number of events generated. -1 if not known. */
+ @JsonProperty
+ long numEvents;
+
+ /** Cumulative number of results emitted. -1 if not known. */
+ @JsonProperty
+ long numResults;
+
+ /**
+ * Return true if there looks to be activity between {@code this} and {@code that}
+ * snapshots.
+ */
+ public boolean anyActivity(ProgressSnapshot that) {
+ if (runtimeSec != that.runtimeSec) {
+ // An event or result end timestamp looks to have changed.
+ return true;
+ }
+ if (numEvents != that.numEvents) {
+ // Some more events were generated.
+ return true;
+ }
+ if (numResults != that.numResults) {
+ // Some more results were emitted.
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Progess snapshots. Null if not yet calculated.
+ */
+ @JsonProperty
+ @Nullable
+ public List<ProgressSnapshot> snapshots = null;
+
+ /**
+ * Effective runtime, in seconds. Measured from timestamp of first generated event to latest of
+ * timestamp of last generated event and last emitted result. -1 if not known.
+ */
+ @JsonProperty
+ public double runtimeSec = -1.0;
+
+ /**
+ * Number of events generated. -1 if not known.
+ */
+ @JsonProperty
+ public long numEvents = -1;
+
+ /**
+ * Number of events generated per second of runtime. For batch this is number of events
+ * over the above runtime. For streaming this is the 'steady-state' event generation rate sampled
+ * over the lifetime of the job. -1 if not known.
+ */
+ @JsonProperty
+ public double eventsPerSec = -1.0;
+
+ /**
+ * Number of event bytes generated per second of runtime. -1 if not known.
+ */
+ @JsonProperty
+ public double eventBytesPerSec = -1.0;
+
+ /**
+ * Number of results emitted. -1 if not known.
+ */
+ @JsonProperty
+ public long numResults = -1;
+
+ /**
+ * Number of results generated per second of runtime. -1 if not known.
+ */
+ @JsonProperty
+ public double resultsPerSec = -1.0;
+
+ /**
+ * Number of result bytes generated per second of runtime. -1 if not known.
+ */
+ @JsonProperty
+ public double resultBytesPerSec = -1.0;
+
+ /**
+ * Delay between start of job and first event in second. -1 if not known.
+ */
+ @JsonProperty
+ public double startupDelaySec = -1.0;
+
+ /**
+ * Delay between first event and first result in seconds. -1 if not known.
+ */
+ @JsonProperty
+ public double processingDelaySec = -1.0;
+
+ /**
+ * Delay between last result and job completion in seconds. -1 if not known.
+ */
+ @JsonProperty
+ public double shutdownDelaySec = -1.0;
+
+ /**
+ * Time-dilation factor. Calculate as event time advancement rate relative to real time.
+ * Greater than one implies we processed events faster than they would have been generated
+ * in real time. Less than one implies we could not keep up with events in real time.
+ * -1 if not known.
+ */
+ @JsonProperty
+ double timeDilation = -1.0;
+
+ /**
+ * List of errors encountered during job execution.
+ */
+ @JsonProperty
+ @Nullable
+ public List<String> errors = null;
+
+ /**
+ * The job id this perf was drawn from. Null if not known.
+ */
+ @JsonProperty
+ @Nullable
+ public String jobId = null;
+
+ /**
+ * Return a JSON representation of performance.
+ */
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse a {@link NexmarkPerf} object from JSON {@code string}.
+ */
+ public static NexmarkPerf fromString(String string) {
+ try {
+ return NexmarkUtils.MAPPER.readValue(string, NexmarkPerf.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to parse nexmark perf: ", e);
+ }
+ }
+
+ /**
+ * Return true if there looks to be activity between {@code this} and {@code that}
+ * perf values.
+ */
+ public boolean anyActivity(NexmarkPerf that) {
+ if (runtimeSec != that.runtimeSec) {
+ // An event or result end timestamp looks to have changed.
+ return true;
+ }
+ if (numEvents != that.numEvents) {
+ // Some more events were generated.
+ return true;
+ }
+ if (numResults != that.numResults) {
+ // Some more results were emitted.
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
new file mode 100644
index 0000000..d38cb7b
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A set of {@link NexmarkConfiguration}s.
+ */
+public enum NexmarkSuite {
+ /**
+ * The default.
+ */
+ DEFAULT(defaultConf()),
+
+ /**
+ * Sweep through all queries using the default configuration.
+ * 100k/10k events (depending on query).
+ */
+ SMOKE(smoke()),
+
+ /**
+ * As for SMOKE, but with 10m/1m events.
+ */
+ STRESS(stress()),
+
+ /**
+ * As for SMOKE, but with 1b/100m events.
+ */
+ FULL_THROTTLE(fullThrottle());
+
+ private static List<NexmarkConfiguration> defaultConf() {
+ List<NexmarkConfiguration> configurations = new ArrayList<>();
+ NexmarkConfiguration configuration = new NexmarkConfiguration();
+ configurations.add(configuration);
+ return configurations;
+ }
+
+ private static List<NexmarkConfiguration> smoke() {
+ List<NexmarkConfiguration> configurations = new ArrayList<>();
+ for (int query = 0; query <= 12; query++) {
+ NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
+ configuration.query = query;
+ configuration.numEvents = 100_000;
+ if (query == 4 || query == 6 || query == 9) {
+ // Scale back so overall runtimes are reasonably close across all queries.
+ configuration.numEvents /= 10;
+ }
+ configurations.add(configuration);
+ }
+ return configurations;
+ }
+
+ private static List<NexmarkConfiguration> stress() {
+ List<NexmarkConfiguration> configurations = smoke();
+ for (NexmarkConfiguration configuration : configurations) {
+ if (configuration.numEvents >= 0) {
+ configuration.numEvents *= 1000;
+ }
+ }
+ return configurations;
+ }
+
+ private static List<NexmarkConfiguration> fullThrottle() {
+ List<NexmarkConfiguration> configurations = smoke();
+ for (NexmarkConfiguration configuration : configurations) {
+ if (configuration.numEvents >= 0) {
+ configuration.numEvents *= 1000;
+ }
+ }
+ return configurations;
+ }
+
+ private final List<NexmarkConfiguration> configurations;
+
+ NexmarkSuite(List<NexmarkConfiguration> configurations) {
+ this.configurations = configurations;
+ }
+
+ /**
+ * Return the configurations corresponding to this suite. We'll override each configuration
+ * with any set command line flags, except for --isStreaming which is only respected for
+ * the {@link #DEFAULT} suite.
+ */
+ public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) {
+ Set<NexmarkConfiguration> results = new LinkedHashSet<>();
+ for (NexmarkConfiguration configuration : configurations) {
+ NexmarkConfiguration result = configuration.copy();
+ result.overrideFromOptions(options);
+ results.add(result);
+ }
+ return results;
+ }
+}