You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:59 UTC

[51/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
deleted file mode 100644
index a609975..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
+++ /dev/null
@@ -1,1158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.integration.nexmark.queries.NexmarkQuery;
-import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel;
-import org.apache.beam.integration.nexmark.queries.Query0;
-import org.apache.beam.integration.nexmark.queries.Query0Model;
-import org.apache.beam.integration.nexmark.queries.Query1;
-import org.apache.beam.integration.nexmark.queries.Query10;
-import org.apache.beam.integration.nexmark.queries.Query11;
-import org.apache.beam.integration.nexmark.queries.Query12;
-import org.apache.beam.integration.nexmark.queries.Query1Model;
-import org.apache.beam.integration.nexmark.queries.Query2;
-import org.apache.beam.integration.nexmark.queries.Query2Model;
-import org.apache.beam.integration.nexmark.queries.Query3;
-import org.apache.beam.integration.nexmark.queries.Query3Model;
-import org.apache.beam.integration.nexmark.queries.Query4;
-import org.apache.beam.integration.nexmark.queries.Query4Model;
-import org.apache.beam.integration.nexmark.queries.Query5;
-import org.apache.beam.integration.nexmark.queries.Query5Model;
-import org.apache.beam.integration.nexmark.queries.Query6;
-import org.apache.beam.integration.nexmark.queries.Query6Model;
-import org.apache.beam.integration.nexmark.queries.Query7;
-import org.apache.beam.integration.nexmark.queries.Query7Model;
-import org.apache.beam.integration.nexmark.queries.Query8;
-import org.apache.beam.integration.nexmark.queries.Query8Model;
-import org.apache.beam.integration.nexmark.queries.Query9;
-import org.apache.beam.integration.nexmark.queries.Query9Model;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.joda.time.Duration;
-import org.slf4j.LoggerFactory;
-
-/**
- * Run a single Nexmark query using a given configuration.
- */
-public class NexmarkLauncher<OptionT extends NexmarkOptions> {
-  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
-  /**
-   * Minimum number of samples needed for 'stead-state' rate calculation.
-   */
-  private static final int MIN_SAMPLES = 9;
-  /**
-   * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
-   */
-  private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
-  /**
-   * Delay between perf samples.
-   */
-  private static final Duration PERF_DELAY = Duration.standardSeconds(15);
-  /**
-   * How long to let streaming pipeline run after all events have been generated and we've
-   * seen no activity.
-   */
-  private static final Duration DONE_DELAY = Duration.standardMinutes(1);
-  /**
-   * How long to allow no activity without warning.
-   */
-  private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
-  /**
-   * How long to let streaming pipeline run after we've
-   * seen no activity, even if all events have not been generated.
-   */
-  private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
-  /**
-   * NexmarkOptions shared by all runs.
-   */
-  private final OptionT options;
-
-  /**
-   * Which configuration we are running.
-   */
-  @Nullable
-  private NexmarkConfiguration configuration;
-
-  /**
-   * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
-   */
-  @Nullable
-  private Monitor<Event> publisherMonitor;
-
-  /**
-   * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
-   */
-  @Nullable
-  private PipelineResult publisherResult;
-
-  /**
-   * Result for the main pipeline.
-   */
-  @Nullable
-  private PipelineResult mainResult;
-
-  /**
-   * Query name we are running.
-   */
-  @Nullable
-  private String queryName;
-
-  public NexmarkLauncher(OptionT options) {
-    this.options = options;
-  }
-
-
-  /**
-   * Is this query running in streaming mode?
-   */
-  private boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  /**
-   * Return maximum number of workers.
-   */
-  private int maxNumWorkers() {
-    return 5;
-  }
-
-  /**
-   * Return the current value for a long counter, or a default value if can't be retrieved.
-   * Note this uses only attempted metrics because some runners don't support committed metrics.
-   */
-  private long getCounterMetric(PipelineResult result, String namespace, String name,
-    long defaultValue) {
-    MetricQueryResults metrics = result.metrics().queryMetrics(
-        MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
-    Iterable<MetricResult<Long>> counters = metrics.counters();
-    try {
-      MetricResult<Long> metricResult = counters.iterator().next();
-      return metricResult.attempted();
-    } catch (NoSuchElementException e) {
-      LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
-    }
-    return defaultValue;
-  }
-
-  /**
-   * Return the current value for a long counter, or a default value if can't be retrieved.
-   * Note this uses only attempted metrics because some runners don't support committed metrics.
-   */
-  private long getDistributionMetric(PipelineResult result, String namespace, String name,
-      DistributionType distType, long defaultValue) {
-    MetricQueryResults metrics = result.metrics().queryMetrics(
-        MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
-    Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
-    try {
-      MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
-      switch (distType)
-      {
-        case MIN:
-          return distributionResult.attempted().min();
-        case MAX:
-          return distributionResult.attempted().max();
-        default:
-          return defaultValue;
-      }
-    } catch (NoSuchElementException e) {
-      LOG.error(
-          "Failed to get distribution metric {} for namespace {}",
-          name,
-          namespace);
-    }
-    return defaultValue;
-  }
-
-  private enum DistributionType {MIN, MAX}
-
-  /**
-   * Return the current value for a time counter, or -1 if can't be retrieved.
-   */
-  private long getTimestampMetric(long now, long value) {
-    // timestamp metrics are used to monitor time of execution of transforms.
-    // If result timestamp metric is too far from now, consider that metric is erroneous
-
-    if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
-      return -1;
-    }
-    return value;
-  }
-
-  /**
-   * Find a 'steady state' events/sec from {@code snapshots} and
-   * store it in {@code perf} if found.
-   */
-  private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
-    if (!options.isStreaming()) {
-      return;
-    }
-
-    // Find the first sample with actual event and result counts.
-    int dataStart = 0;
-    for (; dataStart < snapshots.size(); dataStart++) {
-      if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
-        break;
-      }
-    }
-
-    // Find the last sample which demonstrated progress.
-    int dataEnd = snapshots.size() - 1;
-    for (; dataEnd > dataStart; dataEnd--) {
-      if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
-        break;
-      }
-    }
-
-    int numSamples = dataEnd - dataStart + 1;
-    if (numSamples < MIN_SAMPLES) {
-      // Not enough samples.
-      NexmarkUtils.console("%d samples not enough to calculate steady-state event rate",
-          numSamples);
-      return;
-    }
-
-    // We'll look at only the middle third samples.
-    int sampleStart = dataStart + numSamples / 3;
-    int sampleEnd = dataEnd - numSamples / 3;
-
-    double sampleSec =
-        snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
-    if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
-      // Not sampled over enough time.
-      NexmarkUtils.console(
-          "sample of %.1f sec not long enough to calculate steady-state event rate",
-          sampleSec);
-      return;
-    }
-
-    // Find rate with least squares error.
-    double sumxx = 0.0;
-    double sumxy = 0.0;
-    long prevNumEvents = -1;
-    for (int i = sampleStart; i <= sampleEnd; i++) {
-      if (prevNumEvents == snapshots.get(i).numEvents) {
-        // Skip samples with no change in number of events since they contribute no data.
-        continue;
-      }
-      // Use the effective runtime instead of wallclock time so we can
-      // insulate ourselves from delays and stutters in the query manager.
-      double x = snapshots.get(i).runtimeSec;
-      prevNumEvents = snapshots.get(i).numEvents;
-      double y = prevNumEvents;
-      sumxx += x * x;
-      sumxy += x * y;
-    }
-    double eventsPerSec = sumxy / sumxx;
-    NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
-    perf.eventsPerSec = eventsPerSec;
-  }
-
-  /**
-   * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
-   */
-  private NexmarkPerf currentPerf(
-      long startMsSinceEpoch, long now, PipelineResult result,
-      List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
-      Monitor<?> resultMonitor) {
-    NexmarkPerf perf = new NexmarkPerf();
-
-    long numEvents =
-      getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1);
-    long numEventBytes =
-      getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1);
-    long eventStart =
-      getTimestampMetric(now,
-        getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime",
-          DistributionType.MIN, -1));
-    long eventEnd =
-      getTimestampMetric(now,
-        getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime",
-          DistributionType.MAX, -1));
-
-    long numResults =
-      getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1);
-    long numResultBytes =
-      getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1);
-    long resultStart =
-      getTimestampMetric(now,
-        getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime",
-          DistributionType.MIN, -1));
-    long resultEnd =
-      getTimestampMetric(now,
-        getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime",
-          DistributionType.MAX, -1));
-    long timestampStart =
-      getTimestampMetric(now,
-        getDistributionMetric(result,
-          resultMonitor.name, resultMonitor.prefix + ".startTimestamp",
-          DistributionType.MIN, -1));
-    long timestampEnd =
-      getTimestampMetric(now,
-        getDistributionMetric(result,
-          resultMonitor.name, resultMonitor.prefix + ".endTimestamp",
-          DistributionType.MAX, -1));
-
-    long effectiveEnd = -1;
-    if (eventEnd >= 0 && resultEnd >= 0) {
-      // It is possible for events to be generated after the last result was emitted.
-      // (Eg Query 2, which only yields results for a small prefix of the event stream.)
-      // So use the max of last event and last result times.
-      effectiveEnd = Math.max(eventEnd, resultEnd);
-    } else if (resultEnd >= 0) {
-      effectiveEnd = resultEnd;
-    } else if (eventEnd >= 0) {
-      // During startup we may have no result yet, but we would still like to track how
-      // long the pipeline has been running.
-      effectiveEnd = eventEnd;
-    }
-
-    if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
-      perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
-    }
-
-    if (numEvents >= 0) {
-      perf.numEvents = numEvents;
-    }
-
-    if (numEvents >= 0 && perf.runtimeSec > 0.0) {
-      // For streaming we may later replace this with a 'steady-state' value calculated
-      // from the progress snapshots.
-      perf.eventsPerSec = numEvents / perf.runtimeSec;
-    }
-
-    if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
-      perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
-    }
-
-    if (numResults >= 0) {
-      perf.numResults = numResults;
-    }
-
-    if (numResults >= 0 && perf.runtimeSec > 0.0) {
-      perf.resultsPerSec = numResults / perf.runtimeSec;
-    }
-
-    if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
-      perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
-    }
-
-    if (eventStart >= 0) {
-      perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
-    }
-
-    if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
-      perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
-    }
-
-    if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
-      double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
-      perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
-    }
-
-    if (resultEnd >= 0) {
-      // Fill in the shutdown delay assuming the job has now finished.
-      perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
-    }
-
-    // As soon as available, try to capture cumulative cost at this point too.
-
-    NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
-    snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
-    snapshot.runtimeSec = perf.runtimeSec;
-    snapshot.numEvents = numEvents;
-    snapshot.numResults = numResults;
-    snapshots.add(snapshot);
-
-    captureSteadyState(perf, snapshots);
-
-    return perf;
-  }
-
-  /**
-   * Build and run a pipeline using specified options.
-   */
-  interface PipelineBuilder<OptionT extends NexmarkOptions> {
-    void build(OptionT publishOnlyOptions);
-  }
-
-  /**
-   * Invoke the builder with options suitable for running a publish-only child pipeline.
-   */
-  private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
-    builder.build(options);
-  }
-
-  /**
-   * Monitor the performance and progress of a running job. Return final performance if
-   * it was measured.
-   */
-  @Nullable
-  private NexmarkPerf monitor(NexmarkQuery query) {
-    if (!options.getMonitorJobs()) {
-      return null;
-    }
-
-    if (configuration.debug) {
-      NexmarkUtils.console("Waiting for main pipeline to 'finish'");
-    } else {
-      NexmarkUtils.console("--debug=false, so job will not self-cancel");
-    }
-
-    PipelineResult job = mainResult;
-    PipelineResult publisherJob = publisherResult;
-    List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>();
-    long startMsSinceEpoch = System.currentTimeMillis();
-    long endMsSinceEpoch = -1;
-    if (options.getRunningTimeMinutes() != null) {
-      endMsSinceEpoch = startMsSinceEpoch
-                        + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
-                        - Duration.standardSeconds(configuration.preloadSeconds).getMillis();
-    }
-    long lastActivityMsSinceEpoch = -1;
-    NexmarkPerf perf = null;
-    boolean waitingForShutdown = false;
-    boolean publisherCancelled = false;
-    List<String> errors = new ArrayList<>();
-
-    while (true) {
-      long now = System.currentTimeMillis();
-      if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
-        NexmarkUtils.console("Reached end of test, cancelling job");
-        try {
-          job.cancel();
-        } catch (IOException e) {
-          throw new RuntimeException("Unable to cancel main job: ", e);
-        }
-        if (publisherResult != null) {
-          try {
-            publisherJob.cancel();
-          } catch (IOException e) {
-            throw new RuntimeException("Unable to cancel publisher job: ", e);
-          }
-          publisherCancelled = true;
-        }
-        waitingForShutdown = true;
-      }
-
-      PipelineResult.State state = job.getState();
-      NexmarkUtils.console("%s %s%s", state, queryName,
-          waitingForShutdown ? " (waiting for shutdown)" : "");
-
-      NexmarkPerf currPerf;
-      if (configuration.debug) {
-        currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots,
-                               query.eventMonitor, query.resultMonitor);
-      } else {
-        currPerf = null;
-      }
-
-      if (perf == null || perf.anyActivity(currPerf)) {
-        lastActivityMsSinceEpoch = now;
-      }
-
-      if (options.isStreaming() && !waitingForShutdown) {
-        Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
-        long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
-        if (fatalCount > 0) {
-          NexmarkUtils.console("job has fatal errors, cancelling.");
-          errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
-          waitingForShutdown = true;
-        } else if (configuration.debug && configuration.numEvents > 0
-                   && currPerf.numEvents == configuration.numEvents
-                   && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) {
-          NexmarkUtils.console("streaming query appears to have finished, cancelling job.");
-          waitingForShutdown = true;
-        } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
-          NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.");
-          errors.add("Streaming job was cancelled since appeared stuck");
-          waitingForShutdown = true;
-        } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
-          NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.",
-              quietFor.getStandardMinutes());
-          errors.add(
-              String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
-        }
-
-        if (waitingForShutdown) {
-          try {
-            job.cancel();
-          } catch (IOException e) {
-            throw new RuntimeException("Unable to cancel main job: ", e);
-          }
-        }
-      }
-
-      perf = currPerf;
-
-      boolean running = true;
-      switch (state) {
-        case UNKNOWN:
-        case STOPPED:
-        case RUNNING:
-          // Keep going.
-          break;
-        case DONE:
-          // All done.
-          running = false;
-          break;
-        case CANCELLED:
-          running = false;
-          if (!waitingForShutdown) {
-            errors.add("Job was unexpectedly cancelled");
-          }
-          break;
-        case FAILED:
-        case UPDATED:
-          // Abnormal termination.
-          running = false;
-          errors.add("Job was unexpectedly updated");
-          break;
-      }
-
-      if (!running) {
-        break;
-      }
-
-      if (lastActivityMsSinceEpoch == now) {
-        NexmarkUtils.console("new perf %s", perf);
-      } else {
-        NexmarkUtils.console("no activity");
-      }
-
-      try {
-        Thread.sleep(PERF_DELAY.getMillis());
-      } catch (InterruptedException e) {
-        Thread.interrupted();
-        NexmarkUtils.console("Interrupted: pipeline is still running");
-      }
-    }
-
-    perf.errors = errors;
-    perf.snapshots = snapshots;
-
-    if (publisherResult != null) {
-      NexmarkUtils.console("Shutting down publisher pipeline.");
-      try {
-        if (!publisherCancelled) {
-          publisherJob.cancel();
-        }
-        publisherJob.waitUntilFinish(Duration.standardMinutes(5));
-      } catch (IOException e) {
-        throw new RuntimeException("Unable to cancel publisher job: ", e);
-      }
-    }
-
-    return perf;
-  }
-
-  // ================================================================================
-  // Basic sources and sinks
-  // ================================================================================
-
-  /**
-   * Return a topic name.
-   */
-  private String shortTopic(long now) {
-    String baseTopic = options.getPubsubTopic();
-    if (Strings.isNullOrEmpty(baseTopic)) {
-      throw new RuntimeException("Missing --pubsubTopic");
-    }
-    switch (options.getResourceNameMode()) {
-      case VERBATIM:
-        return baseTopic;
-      case QUERY:
-        return String.format("%s_%s_source", baseTopic, queryName);
-      case QUERY_AND_SALT:
-        return String.format("%s_%s_%d_source", baseTopic, queryName, now);
-    }
-    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
-  }
-
-  /**
-   * Return a subscription name.
-   */
-  private String shortSubscription(long now) {
-    String baseSubscription = options.getPubsubSubscription();
-    if (Strings.isNullOrEmpty(baseSubscription)) {
-      throw new RuntimeException("Missing --pubsubSubscription");
-    }
-    switch (options.getResourceNameMode()) {
-      case VERBATIM:
-        return baseSubscription;
-      case QUERY:
-        return String.format("%s_%s_source", baseSubscription, queryName);
-      case QUERY_AND_SALT:
-        return String.format("%s_%s_%d_source", baseSubscription, queryName, now);
-    }
-    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
-  }
-
-  /**
-   * Return a file name for plain text.
-   */
-  private String textFilename(long now) {
-    String baseFilename = options.getOutputPath();
-    if (Strings.isNullOrEmpty(baseFilename)) {
-      throw new RuntimeException("Missing --outputPath");
-    }
-    switch (options.getResourceNameMode()) {
-      case VERBATIM:
-        return baseFilename;
-      case QUERY:
-        return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
-      case QUERY_AND_SALT:
-        return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now);
-    }
-    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
-  }
-
-  /**
-   * Return a BigQuery table spec.
-   */
-  private String tableSpec(long now, String version) {
-    String baseTableName = options.getBigQueryTable();
-    if (Strings.isNullOrEmpty(baseTableName)) {
-      throw new RuntimeException("Missing --bigQueryTable");
-    }
-    switch (options.getResourceNameMode()) {
-      case VERBATIM:
-        return String.format("%s:nexmark.%s_%s",
-                             options.getProject(), baseTableName, version);
-      case QUERY:
-        return String.format("%s:nexmark.%s_%s_%s",
-                             options.getProject(), baseTableName, queryName, version);
-      case QUERY_AND_SALT:
-        return String.format("%s:nexmark.%s_%s_%s_%d",
-                             options.getProject(), baseTableName, queryName, version, now);
-    }
-    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
-  }
-
-  /**
-   * Return a directory for logs.
-   */
-  private String logsDir(long now) {
-    String baseFilename = options.getOutputPath();
-    if (Strings.isNullOrEmpty(baseFilename)) {
-      throw new RuntimeException("Missing --outputPath");
-    }
-    switch (options.getResourceNameMode()) {
-      case VERBATIM:
-        return baseFilename;
-      case QUERY:
-        return String.format("%s/logs_%s", baseFilename, queryName);
-      case QUERY_AND_SALT:
-        return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
-    }
-    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
-  }
-
-  /**
-   * Return a source of synthetic events.
-   */
-  private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
-    if (isStreaming()) {
-      NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
-      return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration));
-    } else {
-      NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
-      return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration));
-    }
-  }
-
-  /**
-   * Return source of events from Pubsub.
-   */
-  private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
-    String shortSubscription = shortSubscription(now);
-    NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
-
-    PubsubIO.Read<PubsubMessage> io =
-        PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
-            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
-    if (!configuration.usePubsubPublishTime) {
-      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
-    }
-
-    return p
-      .apply(queryName + ".ReadPubsubEvents", io)
-      .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          byte[] payload = c.element().getPayload();
-          try {
-            Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
-            c.output(event);
-          } catch (CoderException e) {
-            LOG.error("Error while decoding Event from pusbSub message: serialization error");
-          }
-        }
-      }));
-  }
-
-  /**
-   * Return Avro source of events from {@code options.getInputFilePrefix}.
-   */
-  private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
-    String filename = options.getInputPath();
-    if (Strings.isNullOrEmpty(filename)) {
-      throw new RuntimeException("Missing --inputPath");
-    }
-    NexmarkUtils.console("Reading events from Avro files at %s", filename);
-    return p
-        .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class)
-                          .from(filename + "*.avro"))
-        .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
-  }
-
-  /**
-   * Send {@code events} to Pubsub.
-   */
-  private void sinkEventsToPubsub(PCollection<Event> events, long now) {
-    String shortTopic = shortTopic(now);
-    NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
-
-    PubsubIO.Write<PubsubMessage> io =
-        PubsubIO.writeMessages().to(shortTopic)
-            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
-    if (!configuration.usePubsubPublishTime) {
-      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
-    }
-
-    events.apply(queryName + ".EventToPubsubMessage",
-            ParDo.of(new DoFn<Event, PubsubMessage>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                try {
-                  byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
-                  c.output(new PubsubMessage(payload, new HashMap<String, String>()));
-                } catch (CoderException e1) {
-                  LOG.error("Error while sending Event {} to pusbSub: serialization error",
-                      c.element().toString());
-                }
-              }
-            })
-        )
-        .apply(queryName + ".WritePubsubEvents", io);
-  }
-
-  /**
-   * Send {@code formattedResults} to Pubsub.
-   */
-  private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
-    String shortTopic = shortTopic(now);
-    NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
-    PubsubIO.Write<String> io =
-        PubsubIO.writeStrings().to(shortTopic)
-            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
-    if (!configuration.usePubsubPublishTime) {
-      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
-    }
-    formattedResults.apply(queryName + ".WritePubsubResults", io);
-  }
-
-  /**
-   * Sink all raw Events in {@code source} to {@code options.getOutputPath}.
-   * This will configure the job to write the following files:
-   * <ul>
-   * <li>{@code $outputPath/event*.avro} All Event entities.
-   * <li>{@code $outputPath/auction*.avro} Auction entities.
-   * <li>{@code $outputPath/bid*.avro} Bid entities.
-   * <li>{@code $outputPath/person*.avro} Person entities.
-   * </ul>
-   *
-   * @param source A PCollection of events.
-   */
-  private void sinkEventsToAvro(PCollection<Event> source) {
-    String filename = options.getOutputPath();
-    if (Strings.isNullOrEmpty(filename)) {
-      throw new RuntimeException("Missing --outputPath");
-    }
-    NexmarkUtils.console("Writing events to Avro files at %s", filename);
-    source.apply(queryName + ".WriteAvroEvents",
-            AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
-    source.apply(NexmarkQuery.JUST_BIDS)
-          .apply(queryName + ".WriteAvroBids",
-            AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
-    source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
-          .apply(queryName + ".WriteAvroAuctions",
-            AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
-    source.apply(NexmarkQuery.JUST_NEW_PERSONS)
-          .apply(queryName + ".WriteAvroPeople",
-            AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
-  }
-
-  /**
-   * Send {@code formattedResults} to text files.
-   */
-  private void sinkResultsToText(PCollection<String> formattedResults, long now) {
-    String filename = textFilename(now);
-    NexmarkUtils.console("Writing results to text files at %s", filename);
-    formattedResults.apply(queryName + ".WriteTextResults",
-        TextIO.write().to(filename));
-  }
-
-  private static class StringToTableRow extends DoFn<String, TableRow> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      int n = ThreadLocalRandom.current().nextInt(10);
-      List<TableRow> records = new ArrayList<>(n);
-      for (int i = 0; i < n; i++) {
-        records.add(new TableRow().set("index", i).set("value", Integer.toString(i)));
-      }
-      c.output(new TableRow().set("result", c.element()).set("records", records));
-    }
-  }
-
-  /**
-   * Send {@code formattedResults} to BigQuery.
-   */
-  private void sinkResultsToBigQuery(
-      PCollection<String> formattedResults, long now,
-      String version) {
-    String tableSpec = tableSpec(now, version);
-    TableSchema tableSchema =
-        new TableSchema().setFields(ImmutableList.of(
-            new TableFieldSchema().setName("result").setType("STRING"),
-            new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD")
-                                  .setFields(ImmutableList.of(
-                                      new TableFieldSchema().setName("index").setType("INTEGER"),
-                                      new TableFieldSchema().setName("value").setType("STRING")))));
-    NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
-    BigQueryIO.Write io =
-        BigQueryIO.write().to(tableSpec)
-                        .withSchema(tableSchema)
-                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
-    formattedResults
-        .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow()))
-        .apply(queryName + ".WriteBigQueryResults", io);
-  }
-
-  // ================================================================================
-  // Construct overall pipeline
-  // ================================================================================
-
-  /**
-   * Return source of events for this run, or null if we are simply publishing events
-   * to Pubsub.
-   */
-  private PCollection<Event> createSource(Pipeline p, final long now) {
-    PCollection<Event> source = null;
-    switch (configuration.sourceType) {
-      case DIRECT:
-        source = sourceEventsFromSynthetic(p);
-        break;
-      case AVRO:
-        source = sourceEventsFromAvro(p);
-        break;
-      case PUBSUB:
-        // Setup the sink for the publisher.
-        switch (configuration.pubSubMode) {
-          case SUBSCRIBE_ONLY:
-            // Nothing to publish.
-            break;
-          case PUBLISH_ONLY:
-            // Send synthesized events to Pubsub in this job.
-            sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop",
-                    NexmarkUtils.snoop(queryName)), now);
-            break;
-          case COMBINED:
-            // Send synthesized events to Pubsub in separate publisher job.
-            // We won't start the main pipeline until the publisher has sent the pre-load events.
-            // We'll shutdown the publisher job when we notice the main job has finished.
-            invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() {
-              @Override
-              public void build(NexmarkOptions publishOnlyOptions) {
-                Pipeline sp = Pipeline.create(options);
-                NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
-                publisherMonitor = new Monitor<>(queryName, "publisher");
-                sinkEventsToPubsub(
-                    sourceEventsFromSynthetic(sp)
-                            .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
-                    now);
-                publisherResult = sp.run();
-              }
-            });
-            break;
-        }
-
-        // Setup the source for the consumer.
-        switch (configuration.pubSubMode) {
-          case PUBLISH_ONLY:
-            // Nothing to consume. Leave source null.
-            break;
-          case SUBSCRIBE_ONLY:
-          case COMBINED:
-            // Read events from pubsub.
-            source = sourceEventsFromPubsub(p, now);
-            break;
-        }
-        break;
-    }
-    return source;
-  }
-
-  private static final TupleTag<String> MAIN = new TupleTag<String>(){};
-  private static final TupleTag<String> SIDE = new TupleTag<String>(){};
-
-  private static class PartitionDoFn extends DoFn<String, String> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      if (c.element().hashCode() % 2 == 0) {
-        c.output(c.element());
-      } else {
-        c.output(SIDE, c.element());
-      }
-    }
-  }
-
-  /**
-   * Consume {@code results}.
-   */
-  private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
-    if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
-      // Avoid the cost of formatting the results.
-      results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
-      return;
-    }
-
-    PCollection<String> formattedResults =
-      results.apply(queryName + ".Format", NexmarkUtils.format(queryName));
-    if (options.getLogResults()) {
-      formattedResults = formattedResults.apply(queryName + ".Results.Log",
-              NexmarkUtils.<String>log(queryName + ".Results"));
-    }
-
-    switch (configuration.sinkType) {
-      case DEVNULL:
-        // Discard all results
-        formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
-        break;
-      case PUBSUB:
-        sinkResultsToPubsub(formattedResults, now);
-        break;
-      case TEXT:
-        sinkResultsToText(formattedResults, now);
-        break;
-      case AVRO:
-        NexmarkUtils.console(
-            "WARNING: with --sinkType=AVRO, actual query results will be discarded.");
-        break;
-      case BIGQUERY:
-        // Multiple BigQuery backends to mimic what most customers do.
-        PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
-            ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
-        sinkResultsToBigQuery(res.get(MAIN), now, "main");
-        sinkResultsToBigQuery(res.get(SIDE), now, "side");
-        sinkResultsToBigQuery(formattedResults, now, "copy");
-        break;
-      case COUNT_ONLY:
-        // Short-circuited above.
-        throw new RuntimeException();
-    }
-  }
-
-  // ================================================================================
-  // Entry point
-  // ================================================================================
-
-  /**
-   * Calculate the distribution of the expected rate of results per minute (in event time, not
-   * wallclock time).
-   */
-  private void modelResultRates(NexmarkQueryModel model) {
-    List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
-    Collections.sort(counts);
-    int n = counts.size();
-    if (n < 5) {
-      NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n);
-    } else {
-      NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
-                           model.configuration.query, n, counts.get(0), counts.get(n / 4),
-                           counts.get(n / 2),
-                           counts.get(n - 1 - n / 4), counts.get(n - 1));
-    }
-  }
-
-  /**
-   * Run {@code configuration} and return its performance if possible.
-   */
-  @Nullable
-  public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
-    if (options.getManageResources() && !options.getMonitorJobs()) {
-      throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
-    }
-
-    //
-    // Setup per-run state.
-    //
-    checkState(configuration == null);
-    checkState(queryName == null);
-    configuration = runConfiguration;
-
-    try {
-      NexmarkUtils.console("Running %s", configuration.toShortString());
-
-      if (configuration.numEvents < 0) {
-        NexmarkUtils.console("skipping since configuration is disabled");
-        return null;
-      }
-
-      List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration),
-                                                 new Query1(configuration),
-                                                 new Query2(configuration),
-                                                 new Query3(configuration),
-                                                 new Query4(configuration),
-                                                 new Query5(configuration),
-                                                 new Query6(configuration),
-                                                 new Query7(configuration),
-                                                 new Query8(configuration),
-                                                 new Query9(configuration),
-                                                 new Query10(configuration),
-                                                 new Query11(configuration),
-                                                 new Query12(configuration));
-      NexmarkQuery query = queries.get(configuration.query);
-      queryName = query.getName();
-
-      List<NexmarkQueryModel> models = Arrays.asList(
-          new Query0Model(configuration),
-          new Query1Model(configuration),
-          new Query2Model(configuration),
-          new Query3Model(configuration),
-          new Query4Model(configuration),
-          new Query5Model(configuration),
-          new Query6Model(configuration),
-          new Query7Model(configuration),
-          new Query8Model(configuration),
-          new Query9Model(configuration),
-          null,
-          null,
-          null);
-      NexmarkQueryModel model = models.get(configuration.query);
-
-      if (options.getJustModelResultRate()) {
-        if (model == null) {
-          throw new RuntimeException(String.format("No model for %s", queryName));
-        }
-        modelResultRates(model);
-        return null;
-      }
-
-      long now = System.currentTimeMillis();
-      Pipeline p = Pipeline.create(options);
-      NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
-
-      // Generate events.
-      PCollection<Event> source = createSource(p, now);
-
-      if (options.getLogEvents()) {
-        source = source.apply(queryName + ".Events.Log",
-                NexmarkUtils.<Event>log(queryName + ".Events"));
-      }
-
-      // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY.
-      // In that case there's nothing more to add to pipeline.
-      if (source != null) {
-        // Optionally sink events in Avro format.
-        // (Query results are ignored).
-        if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
-          sinkEventsToAvro(source);
-        }
-
-        // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs,
-        // so, set parallelism. Also set the output path where to write log files.
-        if (configuration.query == 10) {
-          String path = null;
-          if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
-            path = logsDir(now);
-          }
-          ((Query10) query).setOutputPath(path);
-          ((Query10) query).setMaxNumWorkers(maxNumWorkers());
-        }
-
-        // Apply query.
-        PCollection<TimestampedValue<KnownSize>> results = source.apply(query);
-
-        if (options.getAssertCorrectness()) {
-          if (model == null) {
-            throw new RuntimeException(String.format("No model for %s", queryName));
-          }
-          // We know all our streams have a finite number of elements.
-          results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
-          // If we have a finite number of events then assert our pipeline's
-          // results match those of a model using the same sequence of events.
-          PAssert.that(results).satisfies(model.assertionFor());
-        }
-
-        // Output results.
-        sink(results, now);
-      }
-
-      mainResult = p.run();
-      mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
-      return monitor(query);
-    } finally {
-      configuration = null;
-      queryName = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
deleted file mode 100644
index fbd3e74..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-
-/**
- * Command line flags.
- */
-public interface NexmarkOptions
-    extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions {
-  @Description("Which suite to run. Default is to use command line arguments for one job.")
-  @Default.Enum("DEFAULT")
-  NexmarkSuite getSuite();
-
-  void setSuite(NexmarkSuite suite);
-
-  @Description("If true, monitor the jobs as they run.")
-  @Default.Boolean(false)
-  boolean getMonitorJobs();
-
-  void setMonitorJobs(boolean monitorJobs);
-
-  @Description("Where the events come from.")
-  @Nullable
-  NexmarkUtils.SourceType getSourceType();
-
-  void setSourceType(NexmarkUtils.SourceType sourceType);
-
-  @Description("Prefix for input files if using avro input")
-  @Nullable
-  String getInputPath();
-
-  void setInputPath(String inputPath);
-
-  @Description("Where results go.")
-  @Nullable
-  NexmarkUtils.SinkType getSinkType();
-
-  void setSinkType(NexmarkUtils.SinkType sinkType);
-
-  @Description("Which mode to run in when source is PUBSUB.")
-  @Nullable
-  NexmarkUtils.PubSubMode getPubSubMode();
-
-  void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
-
-  @Description("Which query to run.")
-  @Nullable
-  Integer getQuery();
-
-  void setQuery(Integer query);
-
-  @Description("Prefix for output files if using text output for results or running Query 10.")
-  @Nullable
-  String getOutputPath();
-
-  void setOutputPath(String outputPath);
-
-  @Description("Base name of pubsub topic to publish to in streaming mode.")
-  @Nullable
-  @Default.String("nexmark")
-  String getPubsubTopic();
-
-  void setPubsubTopic(String pubsubTopic);
-
-  @Description("Base name of pubsub subscription to read from in streaming mode.")
-  @Nullable
-  @Default.String("nexmark")
-  String getPubsubSubscription();
-
-  void setPubsubSubscription(String pubsubSubscription);
-
-  @Description("Base name of BigQuery table name if using BigQuery output.")
-  @Nullable
-  @Default.String("nexmark")
-  String getBigQueryTable();
-
-  void setBigQueryTable(String bigQueryTable);
-
-  @Description("Approximate number of events to generate. "
-               + "Zero for effectively unlimited in streaming mode.")
-  @Nullable
-  Long getNumEvents();
-
-  void setNumEvents(Long numEvents);
-
-  @Description("Time in seconds to preload the subscription with data, at the initial input rate "
-               + "of the pipeline.")
-  @Nullable
-  Integer getPreloadSeconds();
-
-  void setPreloadSeconds(Integer preloadSeconds);
-
-  @Description(
-      "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode")
-  @Nullable
-  Integer getStreamTimeout();
-
-  void setStreamTimeout(Integer streamTimeout);
-
-  @Description("Number of unbounded sources to create events.")
-  @Nullable
-  Integer getNumEventGenerators();
-
-  void setNumEventGenerators(Integer numEventGenerators);
-
-  @Description("Shape of event rate curve.")
-  @Nullable
-  NexmarkUtils.RateShape getRateShape();
-
-  void setRateShape(NexmarkUtils.RateShape rateShape);
-
-  @Description("Initial overall event rate (in --rateUnit).")
-  @Nullable
-  Integer getFirstEventRate();
-
-  void setFirstEventRate(Integer firstEventRate);
-
-  @Description("Next overall event rate (in --rateUnit).")
-  @Nullable
-  Integer getNextEventRate();
-
-  void setNextEventRate(Integer nextEventRate);
-
-  @Description("Unit for rates.")
-  @Nullable
-  NexmarkUtils.RateUnit getRateUnit();
-
-  void setRateUnit(NexmarkUtils.RateUnit rateUnit);
-
-  @Description("Overall period of rate shape, in seconds.")
-  @Nullable
-  Integer getRatePeriodSec();
-
-  void setRatePeriodSec(Integer ratePeriodSec);
-
-  @Description("If true, relay events in real time in streaming mode.")
-  @Nullable
-  Boolean getIsRateLimited();
-
-  void setIsRateLimited(Boolean isRateLimited);
-
-  @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
-               + " time in the past so that multiple runs will see exactly the same event streams"
-               + " and should thus have exactly the same results.")
-  @Nullable
-  Boolean getUseWallclockEventTime();
-
-  void setUseWallclockEventTime(Boolean useWallclockEventTime);
-
-  @Description("Assert pipeline results match model results.")
-  @Nullable
-  boolean getAssertCorrectness();
-
-  void setAssertCorrectness(boolean assertCorrectness);
-
-  @Description("Log all input events.")
-  @Nullable
-  boolean getLogEvents();
-
-  void setLogEvents(boolean logEvents);
-
-  @Description("Log all query results.")
-  @Nullable
-  boolean getLogResults();
-
-  void setLogResults(boolean logResults);
-
-  @Description("Average size in bytes for a person record.")
-  @Nullable
-  Integer getAvgPersonByteSize();
-
-  void setAvgPersonByteSize(Integer avgPersonByteSize);
-
-  @Description("Average size in bytes for an auction record.")
-  @Nullable
-  Integer getAvgAuctionByteSize();
-
-  void setAvgAuctionByteSize(Integer avgAuctionByteSize);
-
-  @Description("Average size in bytes for a bid record.")
-  @Nullable
-  Integer getAvgBidByteSize();
-
-  void setAvgBidByteSize(Integer avgBidByteSize);
-
-  @Description("Ratio of bids for 'hot' auctions above the background.")
-  @Nullable
-  Integer getHotAuctionRatio();
-
-  void setHotAuctionRatio(Integer hotAuctionRatio);
-
-  @Description("Ratio of auctions for 'hot' sellers above the background.")
-  @Nullable
-  Integer getHotSellersRatio();
-
-  void setHotSellersRatio(Integer hotSellersRatio);
-
-  @Description("Ratio of auctions for 'hot' bidders above the background.")
-  @Nullable
-  Integer getHotBiddersRatio();
-
-  void setHotBiddersRatio(Integer hotBiddersRatio);
-
-  @Description("Window size in seconds.")
-  @Nullable
-  Long getWindowSizeSec();
-
-  void setWindowSizeSec(Long windowSizeSec);
-
-  @Description("Window period in seconds.")
-  @Nullable
-  Long getWindowPeriodSec();
-
-  void setWindowPeriodSec(Long windowPeriodSec);
-
-  @Description("If in streaming mode, the holdback for watermark in seconds.")
-  @Nullable
-  Long getWatermarkHoldbackSec();
-
-  void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
-
-  @Description("Roughly how many auctions should be in flight for each generator.")
-  @Nullable
-  Integer getNumInFlightAuctions();
-
-  void setNumInFlightAuctions(Integer numInFlightAuctions);
-
-
-  @Description("Maximum number of people to consider as active for placing auctions or bids.")
-  @Nullable
-  Integer getNumActivePeople();
-
-  void setNumActivePeople(Integer numActivePeople);
-
-  @Description("Filename of perf data to append to.")
-  @Nullable
-  String getPerfFilename();
-
-  void setPerfFilename(String perfFilename);
-
-  @Description("Filename of baseline perf data to read from.")
-  @Nullable
-  String getBaselineFilename();
-
-  void setBaselineFilename(String baselineFilename);
-
-  @Description("Filename of summary perf data to append to.")
-  @Nullable
-  String getSummaryFilename();
-
-  void setSummaryFilename(String summaryFilename);
-
-  @Description("Filename for javascript capturing all perf data and any baselines.")
-  @Nullable
-  String getJavascriptFilename();
-
-  void setJavascriptFilename(String javascriptFilename);
-
-  @Description("If true, don't run the actual query. Instead, calculate the distribution "
-               + "of number of query results per (event time) minute according to the query model.")
-  @Nullable
-  boolean getJustModelResultRate();
-
-  void setJustModelResultRate(boolean justModelResultRate);
-
-  @Description("Coder strategy to use.")
-  @Nullable
-  NexmarkUtils.CoderStrategy getCoderStrategy();
-
-  void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
-
-  @Description("Delay, in milliseconds, for each event. We will peg one core for this "
-               + "number of milliseconds to simulate CPU-bound computation.")
-  @Nullable
-  Long getCpuDelayMs();
-
-  void setCpuDelayMs(Long cpuDelayMs);
-
-  @Description("Extra data, in bytes, to save to persistent state for each event. "
-               + "This will force I/O all the way to durable storage to simulate an "
-               + "I/O-bound computation.")
-  @Nullable
-  Long getDiskBusyBytes();
-
-  void setDiskBusyBytes(Long diskBusyBytes);
-
-  @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
-  @Nullable
-  Integer getAuctionSkip();
-
-  void setAuctionSkip(Integer auctionSkip);
-
-  @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
-  @Nullable
-  Integer getFanout();
-
-  void setFanout(Integer fanout);
-
-  @Description("Maximum waiting time to clean personState in query3 "
-      + "(ie maximum waiting of the auctions related to person in state in seconds in event time).")
-  @Nullable
-  Integer getMaxAuctionsWaitingTime();
-
-  void setMaxAuctionsWaitingTime(Integer fanout);
-
-  @Description("Length of occasional delay to impose on events (in seconds).")
-  @Nullable
-  Long getOccasionalDelaySec();
-
-  void setOccasionalDelaySec(Long occasionalDelaySec);
-
-  @Description("Probability that an event will be delayed by delayS.")
-  @Nullable
-  Double getProbDelayedEvent();
-
-  void setProbDelayedEvent(Double probDelayedEvent);
-
-  @Description("Maximum size of each log file (in events). For Query10 only.")
-  @Nullable
-  Integer getMaxLogEvents();
-
-  void setMaxLogEvents(Integer maxLogEvents);
-
-  @Description("How to derive names of resources.")
-  @Default.Enum("QUERY_AND_SALT")
-  NexmarkUtils.ResourceNameMode getResourceNameMode();
-
-  void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
-
-  @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
-  @Default.Boolean(true)
-  boolean getManageResources();
-
-  void setManageResources(boolean manageResources);
-
-  @Description("If true, use pub/sub publish time instead of event time.")
-  @Nullable
-  Boolean getUsePubsubPublishTime();
-
-  void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
-
-  @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
-               + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
-  @Nullable
-  Long getOutOfOrderGroupSize();
-
-  void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
-
-  @Description("If false, do not add the Monitor and Snoop transforms.")
-  @Nullable
-  Boolean getDebug();
-
-  void setDebug(Boolean value);
-
-  @Description("If set, cancel running pipelines after this long")
-  @Nullable
-  Long getRunningTimeMinutes();
-
-  void setRunningTimeMinutes(Long value);
-
-  @Description("If set and --monitorJobs is true, check that the system watermark is never more "
-               + "than this far behind real time")
-  @Nullable
-  Long getMaxSystemLagSeconds();
-
-  void setMaxSystemLagSeconds(Long value);
-
-  @Description("If set and --monitorJobs is true, check that the data watermark is never more "
-               + "than this far behind real time")
-  @Nullable
-  Long getMaxDataLagSeconds();
-
-  void setMaxDataLagSeconds(Long value);
-
-  @Description("Only start validating watermarks after this many seconds")
-  @Nullable
-  Long getWatermarkValidationDelaySeconds();
-
-  void setWatermarkValidationDelaySeconds(Long value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
deleted file mode 100644
index e7f59c8..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Summary of performance for a particular run of a configuration.
- */
-public class NexmarkPerf {
-  /**
-   * A sample of the number of events and number of results (if known) generated at
-   * a particular time.
-   */
-  public static class ProgressSnapshot {
-    /** Seconds since job was started (in wallclock time). */
-    @JsonProperty
-    double secSinceStart;
-
-    /** Job runtime in seconds (time from first event to last generated event or output result). */
-    @JsonProperty
-    double runtimeSec;
-
-    /** Cumulative number of events generated. -1 if not known. */
-    @JsonProperty
-    long numEvents;
-
-    /** Cumulative number of results emitted. -1 if not known. */
-    @JsonProperty
-    long numResults;
-
-    /**
-     * Return true if there looks to be activity between {@code this} and {@code that}
-     * snapshots.
-     */
-    public boolean anyActivity(ProgressSnapshot that) {
-      if (runtimeSec != that.runtimeSec) {
-        // An event or result end timestamp looks to have changed.
-        return true;
-      }
-      if (numEvents != that.numEvents) {
-        // Some more events were generated.
-        return true;
-      }
-      if (numResults != that.numResults) {
-        // Some more results were emitted.
-        return true;
-      }
-      return false;
-    }
-  }
-
-  /**
-   * Progess snapshots. Null if not yet calculated.
-   */
-  @JsonProperty
-  @Nullable
-  public List<ProgressSnapshot> snapshots = null;
-
-  /**
-   * Effective runtime, in seconds. Measured from timestamp of first generated event to latest of
-   * timestamp of last generated event and last emitted result. -1 if not known.
-   */
-  @JsonProperty
-  public double runtimeSec = -1.0;
-
-  /**
-   * Number of events generated. -1 if not known.
-   */
-  @JsonProperty
-  public long numEvents = -1;
-
-  /**
-   * Number of events generated per second of runtime. For batch this is number of events
-   * over the above runtime. For streaming this is the 'steady-state' event generation rate sampled
-   * over the lifetime of the job. -1 if not known.
-   */
-  @JsonProperty
-  public double eventsPerSec = -1.0;
-
-  /**
-   * Number of event bytes generated per second of runtime. -1 if not known.
-   */
-  @JsonProperty
-  public double eventBytesPerSec = -1.0;
-
-  /**
-   * Number of results emitted. -1 if not known.
-   */
-  @JsonProperty
-  public long numResults = -1;
-
-  /**
-   * Number of results generated per second of runtime. -1 if not known.
-   */
-  @JsonProperty
-  public double resultsPerSec = -1.0;
-
-  /**
-   * Number of result bytes generated per second of runtime. -1 if not known.
-   */
-  @JsonProperty
-  public double resultBytesPerSec = -1.0;
-
-  /**
-   * Delay between start of job and first event in second. -1 if not known.
-   */
-  @JsonProperty
-  public double startupDelaySec = -1.0;
-
-  /**
-   * Delay between first event and first result in seconds. -1 if not known.
-   */
-  @JsonProperty
-  public double processingDelaySec = -1.0;
-
-  /**
-   * Delay between last result and job completion in seconds. -1 if not known.
-   */
-  @JsonProperty
-  public double shutdownDelaySec = -1.0;
-
-  /**
-   * Time-dilation factor.  Calculate as event time advancement rate relative to real time.
-   * Greater than one implies we processed events faster than they would have been generated
-   * in real time. Less than one implies we could not keep up with events in real time.
-   * -1 if not known.
-   */
-  @JsonProperty
-  double timeDilation = -1.0;
-
-  /**
-   * List of errors encountered during job execution.
-   */
-  @JsonProperty
-  @Nullable
-  public List<String> errors = null;
-
-  /**
-   * The job id this perf was drawn from. Null if not known.
-   */
-  @JsonProperty
-  @Nullable
-  public String jobId = null;
-
-  /**
-   * Return a JSON representation of performance.
-   */
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Parse a {@link NexmarkPerf} object from JSON {@code string}.
-   */
-  public static NexmarkPerf fromString(String string) {
-    try {
-      return NexmarkUtils.MAPPER.readValue(string, NexmarkPerf.class);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to parse nexmark perf: ", e);
-    }
-  }
-
-  /**
-   * Return true if there looks to be activity between {@code this} and {@code that}
-   * perf values.
-   */
-  public boolean anyActivity(NexmarkPerf that) {
-    if (runtimeSec != that.runtimeSec) {
-      // An event or result end timestamp looks to have changed.
-      return true;
-    }
-    if (numEvents != that.numEvents) {
-      // Some more events were generated.
-      return true;
-    }
-    if (numResults != that.numResults) {
-      // Some more results were emitted.
-      return true;
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
deleted file mode 100644
index 0d98a5d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A set of {@link NexmarkConfiguration}s.
- */
-public enum NexmarkSuite {
-  /**
-   * The default.
-   */
-  DEFAULT(defaultConf()),
-
-  /**
-   * Sweep through all queries using the default configuration.
-   * 100k/10k events (depending on query).
-   */
-  SMOKE(smoke()),
-
-  /**
-   * As for SMOKE, but with 10m/1m events.
-   */
-  STRESS(stress()),
-
-  /**
-   * As for SMOKE, but with 1b/100m events.
-   */
-  FULL_THROTTLE(fullThrottle());
-
-  private static List<NexmarkConfiguration> defaultConf() {
-    List<NexmarkConfiguration> configurations = new ArrayList<>();
-    NexmarkConfiguration configuration = new NexmarkConfiguration();
-    configurations.add(configuration);
-    return configurations;
-  }
-
-  private static List<NexmarkConfiguration> smoke() {
-    List<NexmarkConfiguration> configurations = new ArrayList<>();
-    for (int query = 0; query <= 12; query++) {
-      NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
-      configuration.query = query;
-      configuration.numEvents = 100_000;
-      if (query == 4 || query == 6 || query == 9) {
-        // Scale back so overall runtimes are reasonably close across all queries.
-        configuration.numEvents /= 10;
-      }
-      configurations.add(configuration);
-    }
-    return configurations;
-  }
-
-  private static List<NexmarkConfiguration> stress() {
-    List<NexmarkConfiguration> configurations = smoke();
-    for (NexmarkConfiguration configuration : configurations) {
-      if (configuration.numEvents >= 0) {
-        configuration.numEvents *= 1000;
-      }
-    }
-    return configurations;
-  }
-
-  private static List<NexmarkConfiguration> fullThrottle() {
-    List<NexmarkConfiguration> configurations = smoke();
-    for (NexmarkConfiguration configuration : configurations) {
-      if (configuration.numEvents >= 0) {
-        configuration.numEvents *= 1000;
-      }
-    }
-    return configurations;
-  }
-
-  private final List<NexmarkConfiguration> configurations;
-
-  NexmarkSuite(List<NexmarkConfiguration> configurations) {
-    this.configurations = configurations;
-  }
-
-  /**
-   * Return the configurations corresponding to this suite. We'll override each configuration
-   * with any set command line flags, except for --isStreaming which is only respected for
-   * the {@link #DEFAULT} suite.
-   */
-  public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) {
-    Set<NexmarkConfiguration> results = new LinkedHashSet<>();
-    for (NexmarkConfiguration configuration : configurations) {
-      NexmarkConfiguration result = configuration.copy();
-      result.overrideFromOptions(options);
-      results.add(result);
-    }
-    return results;
-  }
-}