You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:12 UTC

[04/55] [abbrv] beam git commit: NexMark

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
new file mode 100644
index 0000000..2534155
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are 11 queries over a three table schema representing on online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of streaming dataflow.
+ * <p>
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ * <p>
+ * <p>See
+ * <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkGoogleOptions extends Options, DataflowPipelineOptions {
+    @Description("If set, cancel running pipelines after this long")
+    @Nullable
+    Long getRunningTimeMinutes();
+
+    void setRunningTimeMinutes(Long value);
+
+    @Description("If set and --monitorJobs is true, check that the system watermark is never more "
+                 + "than this far behind real time")
+    @Nullable
+    Long getMaxSystemLagSeconds();
+
+    void setMaxSystemLagSeconds(Long value);
+
+    @Description("If set and --monitorJobs is true, check that the data watermark is never more "
+                 + "than this far behind real time")
+    @Nullable
+    Long getMaxDataLagSeconds();
+
+    void setMaxDataLagSeconds(Long value);
+
+    @Description("Only start validating watermarks after this many seconds")
+    @Nullable
+    Long getWatermarkValidationDelaySeconds();
+
+    void setWatermarkValidationDelaySeconds(Long value);
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    // Gather command line args, baseline, configurations, etc.
+    NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
+                                                         .withValidation()
+                                                         .as(NexmarkGoogleOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
+    new NexmarkGoogleDriver().runAll(options, runner);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
new file mode 100644
index 0000000..4b73592
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.transforms.Aggregator;
+
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Run a singe Nexmark query using a given configuration on Google Dataflow.
+ */
+class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> {
+  /**
+   * How long to let streaming pipeline run after all events have been generated and we've
+   * seen no activity.
+   */
+  private static final Duration DONE_DELAY = Duration.standardMinutes(1);
+
+  /**
+   * How long to allow no activity without warning.
+   */
+  private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
+
+  /**
+   * How long to let streaming pipeline run after we've
+   * seen no activity, even if all events have not been generated.
+   */
+  private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
+
+  /**
+   * Delay between perf samples.
+   */
+  private static final Duration PERF_DELAY = Duration.standardSeconds(15);
+
+  /**
+   * Minimum number of samples needed for 'stead-state' rate calculation.
+   */
+  private static final int MIN_SAMPLES = 9;
+
+  /**
+   * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
+   */
+  private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+
+  public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
+    super(options);
+  }
+
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    String machineType = options.getWorkerMachineType();
+    if (machineType == null || machineType.isEmpty()) {
+      return 1;
+    }
+    String[] split = machineType.split("-");
+    if (split.length != 3) {
+      return 1;
+    }
+    try {
+      return Integer.parseInt(split[2]);
+    } catch (NumberFormatException ex) {
+      return 1;
+    }
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
+  }
+
+  @Override
+  protected boolean canMonitor() {
+    return true;
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+    String jobName = options.getJobName();
+    String appName = options.getAppName();
+    options.setJobName("p-" + jobName);
+    options.setAppName("p-" + appName);
+    int coresPerWorker = coresPerWorker();
+    int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
+                                / coresPerWorker;
+    options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
+    options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
+    publisherMonitor = new Monitor<Event>(queryName, "publisher");
+    try {
+      builder.build(options);
+    } finally {
+      options.setJobName(jobName);
+      options.setAppName(appName);
+      options.setMaxNumWorkers(options.getMaxNumWorkers());
+      options.setNumWorkers(options.getNumWorkers());
+    }
+  }
+
+  /**
+   * Monitor the progress of the publisher job. Return when it has been generating events for
+   * at least {@code configuration.preloadSeconds}.
+   */
+  @Override
+  protected void waitForPublisherPreload() {
+    Preconditions.checkNotNull(publisherMonitor);
+    Preconditions.checkNotNull(publisherResult);
+    if (!options.getMonitorJobs()) {
+      return;
+    }
+    if (!(publisherResult instanceof DataflowPipelineJob)) {
+      return;
+    }
+    if (configuration.preloadSeconds <= 0) {
+      return;
+    }
+
+    NexmarkUtils.console("waiting for publisher to pre-load");
+
+    DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
+
+    long numEvents = 0;
+    long startMsSinceEpoch = -1;
+    long endMsSinceEpoch = -1;
+    while (true) {
+      PipelineResult.State state = job.getState();
+      switch (state) {
+        case UNKNOWN:
+          // Keep waiting.
+          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+          break;
+        case STOPPED:
+        case DONE:
+        case CANCELLED:
+        case FAILED:
+        case UPDATED:
+          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+          return;
+        case RUNNING:
+          numEvents = getLong(job, publisherMonitor.getElementCounter());
+          if (startMsSinceEpoch < 0 && numEvents > 0) {
+            startMsSinceEpoch = System.currentTimeMillis();
+            endMsSinceEpoch = startMsSinceEpoch
+                              + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+          }
+          if (endMsSinceEpoch < 0) {
+            NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+          } else {
+            long remainMs = endMsSinceEpoch - System.currentTimeMillis();
+            if (remainMs > 0) {
+              NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
+                  remainMs / 1000);
+            } else {
+              NexmarkUtils.console("publisher preloaded %d events", numEvents);
+              return;
+            }
+          }
+          break;
+      }
+
+      try {
+        Thread.sleep(PERF_DELAY.getMillis());
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new RuntimeException("Interrupted: publisher still running.");
+      }
+    }
+  }
+
+  /**
+   * Monitor the performance and progress of a running job. Return final performance if
+   * it was measured.
+   */
+  @Override
+  @Nullable
+  protected NexmarkPerf monitor(NexmarkQuery query) {
+    if (!options.getMonitorJobs()) {
+      return null;
+    }
+    if (!(mainResult instanceof DataflowPipelineJob)) {
+      return null;
+    }
+
+    if (configuration.debug) {
+      NexmarkUtils.console("Waiting for main pipeline to 'finish'");
+    } else {
+      NexmarkUtils.console("--debug=false, so job will not self-cancel");
+    }
+
+    DataflowPipelineJob job = (DataflowPipelineJob) mainResult;
+    DataflowPipelineJob publisherJob = (DataflowPipelineJob) publisherResult;
+    List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>();
+    long startMsSinceEpoch = System.currentTimeMillis();
+    long endMsSinceEpoch = -1;
+    if (options.getRunningTimeMinutes() != null) {
+      endMsSinceEpoch = startMsSinceEpoch
+                        + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
+                        - Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+    }
+    long lastActivityMsSinceEpoch = -1;
+    NexmarkPerf perf = null;
+    boolean waitingForShutdown = false;
+    boolean publisherCancelled = false;
+    List<String> errors = new ArrayList<>();
+
+    while (true) {
+      long now = System.currentTimeMillis();
+      if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
+        NexmarkUtils.console("Reached end of test, cancelling job");
+        try {
+          job.cancel();
+        } catch (IOException e) {
+          throw new RuntimeException("Unable to cancel main job: ", e);
+        }
+        if (publisherResult != null) {
+          try {
+            publisherJob.cancel();
+          } catch (IOException e) {
+            throw new RuntimeException("Unable to cancel publisher job: ", e);
+          }
+          publisherCancelled = true;
+        }
+        waitingForShutdown = true;
+      }
+
+      PipelineResult.State state = job.getState();
+      NexmarkUtils.console("%s %s%s", state, queryName,
+          waitingForShutdown ? " (waiting for shutdown)" : "");
+
+      NexmarkPerf currPerf;
+      if (configuration.debug) {
+        currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots,
+                               query.eventMonitor, query.resultMonitor);
+      } else {
+        currPerf = null;
+      }
+
+      if (perf == null || perf.anyActivity(currPerf)) {
+        lastActivityMsSinceEpoch = now;
+      }
+
+      if (options.isStreaming() && !waitingForShutdown) {
+        Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
+        if (query.getFatalCount() != null && getLong(job, query.getFatalCount()) > 0) {
+          NexmarkUtils.console("job has fatal errors, cancelling.");
+          errors.add(String.format("Pipeline reported %s fatal errors", query.getFatalCount()));
+          waitingForShutdown = true;
+        } else if (configuration.debug && configuration.numEvents > 0
+                   && currPerf.numEvents == configuration.numEvents
+                   && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) {
+          NexmarkUtils.console("streaming query appears to have finished, cancelling job.");
+          waitingForShutdown = true;
+        } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
+          NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.");
+          errors.add("Streaming job was cancelled since appeared stuck");
+          waitingForShutdown = true;
+        } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
+          NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.",
+              quietFor.getStandardMinutes());
+          errors.add(
+              String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
+        }
+
+        errors.addAll(checkWatermarks(job, startMsSinceEpoch));
+
+        if (waitingForShutdown) {
+          try {
+            job.cancel();
+          } catch (IOException e) {
+            throw new RuntimeException("Unable to cancel main job: ", e);
+          }
+        }
+      }
+
+      perf = currPerf;
+
+      boolean running = true;
+      switch (state) {
+        case UNKNOWN:
+        case STOPPED:
+        case RUNNING:
+          // Keep going.
+          break;
+        case DONE:
+          // All done.
+          running = false;
+          break;
+        case CANCELLED:
+          running = false;
+          if (!waitingForShutdown) {
+            errors.add("Job was unexpectedly cancelled");
+          }
+          break;
+        case FAILED:
+        case UPDATED:
+          // Abnormal termination.
+          running = false;
+          errors.add("Job was unexpectedly updated");
+          break;
+      }
+
+      if (!running) {
+        break;
+      }
+
+      if (lastActivityMsSinceEpoch == now) {
+        NexmarkUtils.console("new perf %s", perf);
+      } else {
+        NexmarkUtils.console("no activity");
+      }
+
+      try {
+        Thread.sleep(PERF_DELAY.getMillis());
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        NexmarkUtils.console("Interrupted: pipeline is still running");
+      }
+    }
+
+    perf.errors = errors;
+    perf.snapshots = snapshots;
+
+    if (publisherResult != null) {
+      NexmarkUtils.console("Shutting down publisher pipeline.");
+      try {
+        if (!publisherCancelled) {
+          publisherJob.cancel();
+        }
+        publisherJob.waitToFinish(5, TimeUnit.MINUTES, null);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to cancel publisher job: ", e);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new RuntimeException("Interrupted: publish job still running.", e);
+      }
+    }
+
+    return perf;
+  }
+
+  enum MetricType {
+    SYSTEM_WATERMARK,
+    DATA_WATERMARK,
+    OTHER
+  }
+
+  private MetricType getMetricType(MetricUpdate metric) {
+    String metricName = metric.getName().getName();
+    if (metricName.endsWith("windmill-system-watermark")) {
+      return MetricType.SYSTEM_WATERMARK;
+    } else if (metricName.endsWith("windmill-data-watermark")) {
+      return MetricType.DATA_WATERMARK;
+    } else {
+      return MetricType.OTHER;
+    }
+  }
+
+  /**
+   * Check that watermarks are not too far behind.
+   * <p>
+   * <p>Returns a list of errors detected.
+   */
+  private List<String> checkWatermarks(DataflowPipelineJob job, long startMsSinceEpoch) {
+    long now = System.currentTimeMillis();
+    List<String> errors = new ArrayList<>();
+    try {
+      JobMetrics metricResponse = job.getDataflowClient()
+                                     .projects()
+                                     .jobs()
+                                     .getMetrics(job.getProjectId(), job.getJobId())
+                                     .execute();
+      List<MetricUpdate> metrics = metricResponse.getMetrics();
+      if (metrics != null) {
+        boolean foundWatermarks = false;
+        for (MetricUpdate metric : metrics) {
+          MetricType type = getMetricType(metric);
+          if (type == MetricType.OTHER) {
+            continue;
+          }
+          foundWatermarks = true;
+          @SuppressWarnings("unchecked")
+          BigDecimal scalar = (BigDecimal) metric.getScalar();
+          if (scalar.signum() < 0) {
+            continue;
+          }
+          Instant value =
+              new Instant(scalar.divideToIntegralValue(new BigDecimal(1000)).longValueExact());
+          Instant updateTime = Instant.parse(metric.getUpdateTime());
+
+          if (options.getWatermarkValidationDelaySeconds() == null
+              || now > startMsSinceEpoch
+                       + Duration.standardSeconds(options.getWatermarkValidationDelaySeconds())
+                                 .getMillis()) {
+            Duration threshold = null;
+            if (type == MetricType.SYSTEM_WATERMARK && options.getMaxSystemLagSeconds() != null) {
+              threshold = Duration.standardSeconds(options.getMaxSystemLagSeconds());
+            } else if (type == MetricType.DATA_WATERMARK
+                       && options.getMaxDataLagSeconds() != null) {
+              threshold = Duration.standardSeconds(options.getMaxDataLagSeconds());
+            }
+
+            if (threshold != null && value.isBefore(updateTime.minus(threshold))) {
+              String msg = String.format("High lag for %s: %s vs %s (allowed lag of %s)",
+                  metric.getName().getName(), value, updateTime, threshold);
+              errors.add(msg);
+              NexmarkUtils.console(msg);
+            }
+          }
+        }
+        if (!foundWatermarks) {
+          NexmarkUtils.console("No known watermarks in update: " + metrics);
+          if (now > startMsSinceEpoch + Duration.standardMinutes(5).getMillis()) {
+            errors.add("No known watermarks found.  Metrics were " + metrics);
+          }
+        }
+      }
+    } catch (IOException e) {
+      NexmarkUtils.console("Warning: failed to get JobMetrics: " + e);
+    }
+
+    return errors;
+  }
+
+  /**
+   * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
+   */
+  private NexmarkPerf currentPerf(
+      long startMsSinceEpoch, long now, DataflowPipelineJob job,
+      List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
+      Monitor<?> resultMonitor) {
+    NexmarkPerf perf = new NexmarkPerf();
+
+    long numEvents = getLong(job, eventMonitor.getElementCounter());
+    long numEventBytes = getLong(job, eventMonitor.getBytesCounter());
+    long eventStart = getTimestamp(now, job, eventMonitor.getStartTime());
+    long eventEnd = getTimestamp(now, job, eventMonitor.getEndTime());
+    long numResults = getLong(job, resultMonitor.getElementCounter());
+    long numResultBytes = getLong(job, resultMonitor.getBytesCounter());
+    long resultStart = getTimestamp(now, job, resultMonitor.getStartTime());
+    long resultEnd = getTimestamp(now, job, resultMonitor.getEndTime());
+    long timestampStart = getTimestamp(now, job, resultMonitor.getStartTimestamp());
+    long timestampEnd = getTimestamp(now, job, resultMonitor.getEndTimestamp());
+
+    long effectiveEnd = -1;
+    if (eventEnd >= 0 && resultEnd >= 0) {
+      // It is possible for events to be generated after the last result was emitted.
+      // (Eg Query 2, which only yields results for a small prefix of the event stream.)
+      // So use the max of last event and last result times.
+      effectiveEnd = Math.max(eventEnd, resultEnd);
+    } else if (resultEnd >= 0) {
+      effectiveEnd = resultEnd;
+    } else if (eventEnd >= 0) {
+      // During startup we may have no result yet, but we would still like to track how
+      // long the pipeline has been running.
+      effectiveEnd = eventEnd;
+    }
+
+    if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
+      perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
+    }
+
+    if (numEvents >= 0) {
+      perf.numEvents = numEvents;
+    }
+
+    if (numEvents >= 0 && perf.runtimeSec > 0.0) {
+      // For streaming we may later replace this with a 'steady-state' value calculated
+      // from the progress snapshots.
+      perf.eventsPerSec = numEvents / perf.runtimeSec;
+    }
+
+    if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
+      perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
+    }
+
+    if (numResults >= 0) {
+      perf.numResults = numResults;
+    }
+
+    if (numResults >= 0 && perf.runtimeSec > 0.0) {
+      perf.resultsPerSec = numResults / perf.runtimeSec;
+    }
+
+    if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
+      perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
+    }
+
+    if (eventStart >= 0) {
+      perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
+    }
+
+    if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
+      perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
+    }
+
+    if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
+      double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
+      perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
+    }
+
+    if (resultEnd >= 0) {
+      // Fill in the shutdown delay assuming the job has now finished.
+      perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
+    }
+
+    perf.jobId = job.getJobId();
+    // As soon as available, try to capture cumulative cost at this point too.
+
+    NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
+    snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
+    snapshot.runtimeSec = perf.runtimeSec;
+    snapshot.numEvents = numEvents;
+    snapshot.numResults = numResults;
+    snapshots.add(snapshot);
+
+    captureSteadyState(perf, snapshots);
+
+    return perf;
+  }
+
+  /**
+   * Find a 'steady state' events/sec from {@code snapshots} and
+   * store it in {@code perf} if found.
+   */
+  private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
+    if (!options.isStreaming()) {
+      return;
+    }
+
+    // Find the first sample with actual event and result counts.
+    int dataStart = 0;
+    for (; dataStart < snapshots.size(); dataStart++) {
+      if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
+        break;
+      }
+    }
+
+    // Find the last sample which demonstrated progress.
+    int dataEnd = snapshots.size() - 1;
+    for (; dataEnd > dataStart; dataEnd--) {
+      if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
+        break;
+      }
+    }
+
+    int numSamples = dataEnd - dataStart + 1;
+    if (numSamples < MIN_SAMPLES) {
+      // Not enough samples.
+      NexmarkUtils.console("%d samples not enough to calculate steady-state event rate",
+          numSamples);
+      return;
+    }
+
+    // We'll look at only the middle third samples.
+    int sampleStart = dataStart + numSamples / 3;
+    int sampleEnd = dataEnd - numSamples / 3;
+
+    double sampleSec =
+        snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
+    if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
+      // Not sampled over enough time.
+      NexmarkUtils.console(
+          "sample of %.1f sec not long enough to calculate steady-state event rate",
+          sampleSec);
+      return;
+    }
+
+    // Find rate with least squares error.
+    double sumxx = 0.0;
+    double sumxy = 0.0;
+    long prevNumEvents = -1;
+    for (int i = sampleStart; i <= sampleEnd; i++) {
+      if (prevNumEvents == snapshots.get(i).numEvents) {
+        // Skip samples with no change in number of events since they contribute no data.
+        continue;
+      }
+      // Use the effective runtime instead of wallclock time so we can
+      // insulate ourselves from delays and stutters in the query manager.
+      double x = snapshots.get(i).runtimeSec;
+      prevNumEvents = snapshots.get(i).numEvents;
+      double y = prevNumEvents;
+      sumxx += x * x;
+      sumxy += x * y;
+    }
+    double eventsPerSec = sumxy / sumxx;
+    NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
+    perf.eventsPerSec = eventsPerSec;
+  }
+
+  /**
+   * Return the current value for a long counter, or -1 if can't be retrieved.
+   */
+  private long getLong(DataflowPipelineJob job, Aggregator<Long, Long> aggregator) {
+    try {
+      Collection<Long> values = job.getAggregatorValues(aggregator).getValues();
+      if (values.size() != 1) {
+        return -1;
+      }
+      return Iterables.getOnlyElement(values);
+    } catch (AggregatorRetrievalException e) {
+      return -1;
+    }
+  }
+
+  /**
+   * Return the current value for a time counter, or -1 if can't be retrieved.
+   */
+  private long getTimestamp(
+      long now, DataflowPipelineJob job, Aggregator<Long, Long> aggregator) {
+    try {
+      Collection<Long> values = job.getAggregatorValues(aggregator).getValues();
+      if (values.size() != 1) {
+        return -1;
+      }
+      long value = Iterables.getOnlyElement(values);
+      if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
+        return -1;
+      }
+      return value;
+    } catch (AggregatorRetrievalException e) {
+      return -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessDriver.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessDriver.java
new file mode 100644
index 0000000..fe279c0
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessDriver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.direct.InProcessPipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An implementation of the 'NEXMark queries' using the in-process runner.
+ */
+class NexmarkInProcessDriver extends NexmarkDriver<NexmarkInProcessDriver.NexmarkInProcessOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkInProcessOptions extends Options, DataflowPipelineOptions {
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    NexmarkInProcessOptions options =
+        PipelineOptionsFactory.fromArgs(args)
+                              .withValidation()
+                              .as(NexmarkInProcessOptions.class);
+    options.setRunner(InProcessPipelineRunner.class);
+    NexmarkInProcessRunner runner = new NexmarkInProcessRunner(options);
+    new NexmarkInProcessDriver().runAll(options, runner);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessRunner.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessRunner.java
new file mode 100644
index 0000000..ba141f9
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkInProcessRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import javax.annotation.Nullable;
+
+/**
+ * Run a singe Nexmark query using a given configuration on Google Dataflow.
+ */
+class NexmarkInProcessRunner extends NexmarkRunner<NexmarkInProcessDriver.NexmarkInProcessOptions> {
+  public NexmarkInProcessRunner(NexmarkInProcessDriver.NexmarkInProcessOptions options) {
+    super(options);
+  }
+
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    return 4;
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return 1;
+  }
+
+  @Override
+  protected boolean canMonitor() {
+    return false;
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+    throw new UnsupportedOperationException(
+        "Cannot use --pubSubMode=COMBINED with InProcessPipelineRunner");
+  }
+
+  /**
+   * Monitor the progress of the publisher job. Return when it has been generating events for
+   * at least {@code configuration.preloadSeconds}.
+   */
+  @Override
+  protected void waitForPublisherPreload() {
+    throw new UnsupportedOperationException(
+        "Cannot use --pubSubMode=COMBINED with InProcessPipelineRunner");
+  }
+
+  /**
+   * Monitor the performance and progress of a running job. Return final performance if
+   * it was measured.
+   */
+  @Override
+  @Nullable
+  protected NexmarkPerf monitor(NexmarkQuery query) {
+    throw new UnsupportedOperationException(
+        "Cannot use --monitorJobs=true with InProcessPipelineRunner");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
new file mode 100644
index 0000000..6eb7267
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Summary of performance for a particular run of a configuration.
+ */
+class NexmarkPerf {
+  /**
+   * A sample of the number of events and number of results (if known) generated at
+   * a particular time.
+   */
+  public static class ProgressSnapshot {
+    /** Seconds since job was started (in wallclock time). */
+    @JsonProperty
+    double secSinceStart;
+
+    /** Job runtime in seconds (time from first event to last generated event or output result). */
+    @JsonProperty
+    double runtimeSec;
+
+    /** Cumulative number of events generated. -1 if not known. */
+    @JsonProperty
+    long numEvents;
+
+    /** Cumulative number of results emitted. -1 if not known. */
+    @JsonProperty
+    long numResults;
+
+    /**
+     * Return true if there looks to be activity between {@code this} and {@code that}
+     * snapshots.
+     */
+    public boolean anyActivity(ProgressSnapshot that) {
+      if (runtimeSec != that.runtimeSec) {
+        // An event or result end timestamp looks to have changed.
+        return true;
+      }
+      if (numEvents != that.numEvents) {
+        // Some more events were generated.
+        return true;
+      }
+      if (numResults != that.numResults) {
+        // Some more results were emitted.
+        return true;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Progess snapshots. Null if not yet calculated.
+   */
+  @JsonProperty
+  @Nullable
+  public List<ProgressSnapshot> snapshots = null;
+
+  /**
+   * Effective runtime, in seconds. Measured from timestamp of first generated event to latest of
+   * timestamp of last generated event and last emitted result. -1 if not known.
+   */
+  @JsonProperty
+  public double runtimeSec = -1.0;
+
+  /**
+   * Number of events generated. -1 if not known.
+   */
+  @JsonProperty
+  public long numEvents = -1;
+
+  /**
+   * Number of events generated per second of runtime. For batch this is number of events
+   * over the above runtime. For streaming this is the 'steady-state' event generation rate sampled
+   * over the lifetime of the job. -1 if not known.
+   */
+  @JsonProperty
+  public double eventsPerSec = -1.0;
+
+  /**
+   * Number of event bytes generated per second of runtime. -1 if not known.
+   */
+  @JsonProperty
+  public double eventBytesPerSec = -1.0;
+
+  /**
+   * Number of results emitted. -1 if not known.
+   */
+  @JsonProperty
+  public long numResults = -1;
+
+  /**
+   * Number of results generated per second of runtime. -1 if not known.
+   */
+  @JsonProperty
+  public double resultsPerSec = -1.0;
+
+  /**
+   * Number of result bytes generated per second of runtime. -1 if not known.
+   */
+  @JsonProperty
+  public double resultBytesPerSec = -1.0;
+
+  /**
+   * Delay between start of job and first event in second. -1 if not known.
+   */
+  @JsonProperty
+  public double startupDelaySec = -1.0;
+
+  /**
+   * Delay between first event and first result in seconds. -1 if not known.
+   */
+  @JsonProperty
+  public double processingDelaySec = -1.0;
+
+  /**
+   * Delay between last result and job completion in seconds. -1 if not known.
+   */
+  @JsonProperty
+  public double shutdownDelaySec = -1.0;
+
+  /**
+   * Time-dilation factor.  Calculate as event time advancement rate relative to real time.
+   * Greater than one implies we processed events faster than they would have been generated
+   * in real time. Less than one implies we could not keep up with events in real time.
+   * -1 if not known.
+   */
+  @JsonProperty
+  double timeDilation = -1.0;
+
+  /**
+   * List of errors encountered during job execution.
+   */
+  @JsonProperty
+  @Nullable
+  public List<String> errors = null;
+
+  /**
+   * The job id this perf was drawn from. Null if not known.
+   */
+  @JsonProperty
+  @Nullable
+  public String jobId = null;
+
+  /**
+   * Return a JSON representation of performance.
+   */
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parse a {@link NexmarkPerf} object from JSON {@code string}.
+   *
+   * @throws IOException
+   */
+  public static NexmarkPerf fromString(String string) {
+    try {
+      return NexmarkUtils.MAPPER.readValue(string, NexmarkPerf.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to parse nexmark perf: ", e);
+    }
+  }
+
+  /**
+   * Return true if there looks to be activity between {@code this} and {@code that}
+   * perf values.
+   */
+  public boolean anyActivity(NexmarkPerf that) {
+    if (runtimeSec != that.runtimeSec) {
+      // An event or result end timestamp looks to have changed.
+      return true;
+    }
+    if (numEvents != that.numEvents) {
+      // Some more events were generated.
+      return true;
+    }
+    if (numResults != that.numResults) {
+      // Some more results were emitted.
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
new file mode 100644
index 0000000..4626609
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
+ * multiple queries.
+ */
+public abstract class NexmarkQuery
+    extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
+  protected static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
+  protected static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
+  protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+
+  /** Predicate to detect a new person event. */
+  protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.newPerson != null;
+        }
+      };
+
+  /** DoFn to convert a new person event to a person. */
+  protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().newPerson);
+    }
+  };
+
+  /** Predicate to detect a new auction event. */
+  protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.newAuction != null;
+        }
+      };
+
+  /** DoFn to convert a new auction event to an auction. */
+  protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().newAuction);
+    }
+  };
+
+  /** Predicate to detect a new bid event. */
+  protected static final SerializableFunction<Event, Boolean> IS_BID =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.bid != null;
+        }
+      };
+
+  /** DoFn to convert a bid event to a bid. */
+  protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().bid);
+    }
+  };
+
+  /** Transform to key each person by their id. */
+  protected static final ParDo.Bound<Person, KV<Long, Person>> PERSON_BY_ID =
+      ParDo.named("PersonById")
+           .of(new DoFn<Person, KV<Long, Person>>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().id, c.element()));
+             }
+           });
+
+  /** Transform to key each auction by its id. */
+  protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+      ParDo.named("AuctionById")
+           .of(new DoFn<Auction, KV<Long, Auction>>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().id, c.element()));
+             }
+           });
+
+  /** Transform to key each auction by its seller id. */
+  protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+      ParDo.named("AuctionBySeller")
+           .of(new DoFn<Auction, KV<Long, Auction>>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().seller, c.element()));
+             }
+           });
+
+  /** Transform to key each bid by it's auction id. */
+  protected static final ParDo.Bound<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+      ParDo.named("BidByAuction")
+           .of(new DoFn<Bid, KV<Long, Bid>>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().auction, c.element()));
+             }
+           });
+
+  /** Transform to project the auction id from each bid. */
+  protected static final ParDo.Bound<Bid, Long> BID_TO_AUCTION =
+      ParDo.named("BidToAuction")
+           .of(new DoFn<Bid, Long>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               c.output(c.element().auction);
+             }
+           });
+
+  /** Transform to project the price from each bid. */
+  protected static final ParDo.Bound<Bid, Long> BID_TO_PRICE =
+      ParDo.named("BidToPrice")
+           .of(new DoFn<Bid, Long>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               c.output(c.element().price);
+             }
+           });
+
+  /** Transform to emit each event with the timestamp embedded within it. */
+  public static final ParDo.Bound<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
+      ParDo.named("OutputWithTimestamp")
+           .of(new DoFn<Event, Event>() {
+             @Override
+             public void processElement(ProcessContext c) {
+               Event e = c.element();
+               if (e.bid != null) {
+                 c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
+               } else if (e.newPerson != null) {
+                 c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
+               } else if (e.newAuction != null) {
+                 c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
+               }
+             }
+           });
+
+  /**
+   * Transform to filter for just the new auction events.
+   */
+  protected static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
+      new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
+        @Override
+        public PCollection<Auction> apply(PCollection<Event> input) {
+          return input.apply(Filter.byPredicate(IS_NEW_AUCTION).named("IsAuction"))
+                      .apply(ParDo.named("AsAuction").of(AS_AUCTION));
+        }
+      };
+
+  /**
+   * Transform to filter for just the new person events.
+   */
+  protected static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
+      new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
+        @Override
+        public PCollection<Person> apply(PCollection<Event> input) {
+          return input.apply(Filter.byPredicate(IS_NEW_PERSON).named("IsPerson"))
+                      .apply(ParDo.named("AsPerson").of(AS_PERSON));
+        }
+      };
+
+  /**
+   * Transform to filter for just the bid events.
+   */
+  protected static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
+      new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
+        @Override
+        public PCollection<Bid> apply(PCollection<Event> input) {
+          return input.apply(Filter.byPredicate(IS_BID).named("IsBid"))
+                      .apply(ParDo.named("AsBid").of(AS_BID));
+        }
+      };
+
+  protected final NexmarkConfiguration configuration;
+  public final Monitor<Event> eventMonitor;
+  public final Monitor<KnownSize> resultMonitor;
+  public final Monitor<Event> endOfStreamMonitor;
+
+  protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
+    super(name);
+    this.configuration = configuration;
+    if (configuration.debug) {
+      eventMonitor = new Monitor<>(name + ".Events", "event");
+      resultMonitor = new Monitor<>(name + ".Results", "result");
+      endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
+    } else {
+      eventMonitor = null;
+      resultMonitor = null;
+      endOfStreamMonitor = null;
+    }
+  }
+
+  /**
+   * Return the aggregator which counts fatal errors in this query. Return null if no such
+   * aggregator.
+   */
+  @Nullable
+  public Aggregator<Long, Long> getFatalCount() {
+    return null;
+  }
+
+  /**
+   * Implement the actual query. All we know about the result is it has a known encoded size.
+   */
+  protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
+
+  @Override
+  public PCollection<TimestampedValue<KnownSize>> apply(PCollection<Event> events) {
+
+    if (configuration.debug) {
+      events =
+          events
+              // Monitor events as they go by.
+              .apply(eventMonitor.getTransform())
+              // Count each type of event.
+              .apply(NexmarkUtils.snoop(name));
+    }
+
+    if (configuration.cpuDelayMs > 0) {
+      // Slow down by pegging one core at 100%.
+      events = events.apply(NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
+    }
+
+    if (configuration.diskBusyBytes > 0) {
+      // Slow down by forcing bytes to durable store.
+      events = events.apply(NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
+    }
+
+    // Run the query.
+    PCollection<KnownSize> queryResults = applyPrim(events);
+
+    if (configuration.debug) {
+      // Monitor results as they go by.
+      queryResults = queryResults.apply(resultMonitor.getTransform());
+    }
+
+    // Timestamp the query results.
+    return queryResults.apply(NexmarkUtils.<KnownSize>stamp(name));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
new file mode 100644
index 0000000..b42042f
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base class for models of the eight NEXMark queries. Provides an assertion
+ * function which can be applied against the actual query results to check their consistency
+ * with the model.
+ */
+public abstract class NexmarkQueryModel implements Serializable {
+  /**
+   * Return the start of the most recent window of {@code size} and {@code period} which ends
+   * strictly before {@code timestamp}.
+   */
+  public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+    long ts = timestamp.getMillis();
+    long p = period.getMillis();
+    long lim = ts - ts % p;
+    long s = size.getMillis();
+    return new Instant(lim - s);
+  }
+
+  protected final NexmarkConfiguration configuration;
+
+  public NexmarkQueryModel(NexmarkConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  /**
+   * Convert {@code itr} to strings capturing values, timestamps and order.
+   */
+  protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
+    List<String> strings = new ArrayList<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().toString());
+    }
+    return strings;
+  }
+
+  /**
+   * Convert {@code itr} to strings capturing values and order.
+   */
+  protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
+    List<String> strings = new ArrayList<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().getValue().toString());
+    }
+    return strings;
+  }
+
+  /**
+   * Convert {@code itr} to strings capturing values only.
+   */
+  protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
+    Set<String> strings = new HashSet<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().getValue().toString());
+    }
+    return strings;
+  }
+
+  /** Return simulator for query. */
+  protected abstract AbstractSimulator<?, ?> simulator();
+
+  /** Return sub-sequence of results which are significant for model. */
+  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+      Iterable<TimestampedValue<KnownSize>> results) {
+    return results;
+  }
+
+  /**
+   * Convert iterator of elements to collection of strings to use when testing coherence
+   * of model against actual query results.
+   */
+  protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
+
+  /**
+   * Return assertion to use on results of pipeline for this query.
+   */
+  public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
+    final Collection<String> expectedStrings = toCollection(simulator().results());
+
+    return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
+      @Override
+      public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
+        Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
+        Assert.assertEquals(expectedStrings, actualStrings);
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
new file mode 100644
index 0000000..b7151f8
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+
+/**
+ * Run a single Nexmark query using a given configuration.
+ */
+public abstract class NexmarkRunner<OptionT extends Options> {
+  /**
+   * Options shared by all runs.
+   */
+  protected final OptionT options;
+
+  /**
+   * Which configuration we are running.
+   */
+  @Nullable
+  protected NexmarkConfiguration configuration;
+
+  /**
+   * Accumulate the pub/sub subscriptions etc which should be cleaned up on end of run.
+   */
+  @Nullable
+  protected PubsubHelper pubsub;
+
+  /**
+   * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
+   */
+  @Nullable
+  protected Monitor<Event> publisherMonitor;
+
+  /**
+   * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
+   */
+  @Nullable
+  protected PipelineResult publisherResult;
+
+  /**
+   * Result for the main pipeline.
+   */
+  @Nullable
+  protected PipelineResult mainResult;
+
+  /**
+   * Query name we are running.
+   */
+  @Nullable
+  protected String queryName;
+
+  public NexmarkRunner(OptionT options) {
+    this.options = options;
+  }
+
+  /**
+   * Return a Pubsub helper.
+   */
+  private PubsubHelper getPubsub() {
+    if (pubsub == null) {
+      pubsub = PubsubHelper.create(options);
+    }
+    return pubsub;
+  }
+
+  // ================================================================================
+  // Overridden by each runner.
+  // ================================================================================
+
+  /**
+   * Is this query running in streaming mode?
+   */
+  protected abstract boolean isStreaming();
+
+  /**
+   * Return number of cores per worker.
+   */
+  protected abstract int coresPerWorker();
+
+  /**
+   * Return maximum number of workers.
+   */
+  protected abstract int maxNumWorkers();
+
+  /**
+   * Return true if runner can monitor running jobs.
+   */
+  protected abstract boolean canMonitor();
+
+  /**
+   * Build and run a pipeline using specified options.
+   */
+  protected interface PipelineBuilder<OptionT extends Options> {
+    void build(OptionT publishOnlyOptions);
+  }
+
+  /**
+   * Invoke the builder with options suitable for running a publish-only child pipeline.
+   */
+  protected abstract void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder);
+
+  /**
+   * If monitoring, wait until the publisher pipeline has run long enough to establish
+   * a backlog on the Pubsub topic. Otherwise, return immediately.
+   */
+  protected abstract void waitForPublisherPreload();
+
+  /**
+   * If monitoring, print stats on the main pipeline and return the final perf
+   * when it has run long enough. Otherwise, return {@literal null} immediately.
+   */
+  @Nullable
+  protected abstract NexmarkPerf monitor(NexmarkQuery query);
+
+  // ================================================================================
+  // Basic sources and sinks
+  // ================================================================================
+
+  /**
+   * Return a topic name.
+   */
+  private String shortTopic(long now) {
+    String baseTopic = options.getPubsubTopic();
+    if (Strings.isNullOrEmpty(baseTopic)) {
+      throw new RuntimeException("Missing --pubsubTopic");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseTopic;
+      case QUERY:
+        return String.format("%s_%s_source", baseTopic, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s_%s_%d_source", baseTopic, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a subscription name.
+   */
+  private String shortSubscription(long now) {
+    String baseSubscription = options.getPubsubSubscription();
+    if (Strings.isNullOrEmpty(baseSubscription)) {
+      throw new RuntimeException("Missing --pubsubSubscription");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseSubscription;
+      case QUERY:
+        return String.format("%s_%s_source", baseSubscription, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s_%s_%d_source", baseSubscription, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a file name for plain text.
+   */
+  private String textFilename(long now) {
+    String baseFilename = options.getOutputPath();
+    if (Strings.isNullOrEmpty(baseFilename)) {
+      throw new RuntimeException("Missing --outputPath");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseFilename;
+      case QUERY:
+        return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a BigQuery table spec.
+   */
+  private String tableSpec(long now, String version) {
+    String baseTableName = options.getBigQueryTable();
+    if (Strings.isNullOrEmpty(baseTableName)) {
+      throw new RuntimeException("Missing --bigQueryTable");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return String.format("%s:nexmark.%s_%s",
+                             options.getProject(), baseTableName, version);
+      case QUERY:
+        return String.format("%s:nexmark.%s_%s_%s",
+                             options.getProject(), baseTableName, queryName, version);
+      case QUERY_AND_SALT:
+        return String.format("%s:nexmark.%s_%s_%s_%d",
+                             options.getProject(), baseTableName, queryName, version, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a directory for logs.
+   */
+  private String logsDir(long now) {
+    String baseFilename = options.getOutputPath();
+    if (Strings.isNullOrEmpty(baseFilename)) {
+      throw new RuntimeException("Missing --outputPath");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseFilename;
+      case QUERY:
+        return String.format("%s/logs_%s", baseFilename, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a source of synthetic events.
+   */
+  private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
+    if (isStreaming()) {
+      NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
+      return p.apply(NexmarkUtils.streamEventsSource(queryName, configuration));
+    } else {
+      NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
+      return p.apply(NexmarkUtils.batchEventsSource(queryName, configuration));
+    }
+  }
+
+  /**
+   * Return source of events from Pubsub.
+   */
+  private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
+    String shortTopic = shortTopic(now);
+    String shortSubscription = shortSubscription(now);
+
+    // Create/confirm the subscription.
+    String subscription = null;
+    if (!options.getManageResources()) {
+      // The subscription should already have been created by the user.
+      subscription = getPubsub().reuseSubscription(shortTopic, shortSubscription).getPath();
+    } else {
+      subscription = getPubsub().createSubscription(shortTopic, shortSubscription).getPath();
+    }
+    NexmarkUtils.console("Reading events from Pubsub %s", subscription);
+    PubsubIO.Read.Bound<Event> io =
+        PubsubIO.Read.named(queryName + ".ReadPubsubEvents")
+                     .subscription(subscription)
+                     .idLabel(NexmarkUtils.PUBSUB_ID)
+                     .withCoder(Event.CODER);
+    if (!configuration.usePubsubPublishTime) {
+      io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+    }
+    return p.apply(io);
+  }
+
+  /**
+   * Return Avro source of events from {@code options.getInputFilePrefix}.
+   */
+  private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
+    String filename = options.getInputPath();
+    if (Strings.isNullOrEmpty(filename)) {
+      throw new RuntimeException("Missing --inputPath");
+    }
+    NexmarkUtils.console("Reading events from Avro files at %s", filename);
+    return p
+        .apply(AvroIO.Read.named(queryName + ".ReadAvroEvents")
+                          .from(filename + "*.avro")
+                          .withSchema(Event.class))
+        .apply(NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
+  }
+
+  /**
+   * Send {@code events} to Pubsub.
+   */
+  private void sinkEventsToPubsub(PCollection<Event> events, long now) {
+    String shortTopic = shortTopic(now);
+
+    // Create/confirm the topic.
+    String topic;
+    if (!options.getManageResources()
+        || configuration.pubSubMode == NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY) {
+      // The topic should already have been created by the user or
+      // a companion 'PUBLISH_ONLY' process.
+      topic = getPubsub().reuseTopic(shortTopic).getPath();
+    } else {
+      // Create a fresh topic to loopback via. It will be destroyed when the
+      // (necessarily blocking) job is done.
+      topic = getPubsub().createTopic(shortTopic).getPath();
+    }
+    NexmarkUtils.console("Writing events to Pubsub %s", topic);
+    PubsubIO.Write.Bound<Event> io =
+        PubsubIO.Write.named(queryName + ".WritePubsubEvents")
+                      .topic(topic)
+                      .idLabel(NexmarkUtils.PUBSUB_ID)
+                      .withCoder(Event.CODER);
+    if (!configuration.usePubsubPublishTime) {
+      io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+    }
+    events.apply(io);
+  }
+
+  /**
+   * Send {@code formattedResults} to Pubsub.
+   */
+  private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
+    String shortTopic = shortTopic(now);
+    String topic;
+    if (!options.getManageResources()) {
+      topic = getPubsub().reuseTopic(shortTopic).getPath();
+    } else {
+      topic = getPubsub().createTopic(shortTopic).getPath();
+    }
+    NexmarkUtils.console("Writing results to Pubsub %s", topic);
+    PubsubIO.Write.Bound<String> io =
+        PubsubIO.Write.named(queryName + ".WritePubsubResults")
+                      .topic(topic)
+                      .idLabel(NexmarkUtils.PUBSUB_ID);
+    if (!configuration.usePubsubPublishTime) {
+      io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+    }
+    formattedResults.apply(io);
+  }
+
+  /**
+   * Sink all raw Events in {@code source} to {@code options.getOutputPath}.
+   * This will configure the job to write the following files:
+   * <ul>
+   * <li>{@code $outputPath/event*.avro} All Event entities.
+   * <li>{@code $outputPath/auction*.avro} Auction entities.
+   * <li>{@code $outputPath/bid*.avro} Bid entities.
+   * <li>{@code $outputPath/person*.avro} Person entities.
+   * </ul>
+   *
+   * @param source A PCollection of events.
+   */
+  private void sinkEventsToAvro(PCollection<Event> source) {
+    String filename = options.getOutputPath();
+    if (Strings.isNullOrEmpty(filename)) {
+      throw new RuntimeException("Missing --outputPath");
+    }
+    NexmarkUtils.console("Writing events to Avro files at %s", filename);
+    source.apply(AvroIO.Write.named(queryName + ".WriteAvroEvents")
+                             .to(filename + "/event")
+                             .withSuffix(".avro")
+                             .withSchema(Event.class));
+    source.apply(NexmarkQuery.JUST_BIDS)
+          .apply(AvroIO.Write.named(queryName + ".WriteAvroBids")
+                             .to(filename + "/bid")
+                             .withSuffix(".avro")
+                             .withSchema(Bid.class));
+    source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+          .apply(AvroIO.Write.named(
+              queryName + ".WriteAvroAuctions")
+                             .to(filename + "/auction")
+                             .withSuffix(".avro")
+                             .withSchema(Auction.class));
+    source.apply(NexmarkQuery.JUST_NEW_PERSONS)
+          .apply(AvroIO.Write.named(queryName + ".WriteAvroPeople")
+                             .to(filename + "/person")
+                             .withSuffix(".avro")
+                             .withSchema(Person.class));
+  }
+
+  /**
+   * Send {@code formattedResults} to text files.
+   */
+  private void sinkResultsToText(PCollection<String> formattedResults, long now) {
+    String filename = textFilename(now);
+    NexmarkUtils.console("Writing results to text files at %s", filename);
+    formattedResults.apply(
+        TextIO.Write.named(queryName + ".WriteTextResults")
+                    .to(filename));
+  }
+
+  private static class StringToTableRow extends DoFn<String, TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+      int n = ThreadLocalRandom.current().nextInt(10);
+      List<TableRow> records = new ArrayList<>(n);
+      for (int i = 0; i < n; i++) {
+        records.add(new TableRow().set("index", i).set("value", Integer.toString(i)));
+      }
+      c.output(new TableRow().set("result", c.element()).set("records", records));
+    }
+  }
+
+  /**
+   * Send {@code formattedResults} to BigQuery.
+   */
+  private void sinkResultsToBigQuery(
+      PCollection<String> formattedResults, long now,
+      String version) {
+    String tableSpec = tableSpec(now, version);
+    TableSchema tableSchema =
+        new TableSchema().setFields(ImmutableList.of(
+            new TableFieldSchema().setName("result").setType("STRING"),
+            new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD")
+                                  .setFields(ImmutableList.of(
+                                      new TableFieldSchema().setName("index").setType("INTEGER"),
+                                      new TableFieldSchema().setName("value").setType("STRING")))));
+    NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
+    BigQueryIO.Write.Bound io =
+        BigQueryIO.Write.named(queryName + ".WriteBigQueryResults")
+                        .to(tableSpec)
+                        .withSchema(tableSchema)
+                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
+    formattedResults
+        .apply(ParDo.named(queryName + ".StringToTableRow")
+                    .of(new StringToTableRow()))
+        .apply(io);
+  }
+
+  // ================================================================================
+  // Construct overall pipeline
+  // ================================================================================
+
+  /**
+   * Return source of events for this run, or null if we are simply publishing events
+   * to Pubsub.
+   */
+  private PCollection<Event> createSource(Pipeline p, final long now) {
+    PCollection<Event> source = null;
+    switch (configuration.sourceType) {
+      case DIRECT:
+        source = sourceEventsFromSynthetic(p);
+        break;
+      case AVRO:
+        source = sourceEventsFromAvro(p);
+        break;
+      case PUBSUB:
+        // Setup the sink for the publisher.
+        switch (configuration.pubSubMode) {
+          case SUBSCRIBE_ONLY:
+            // Nothing to publish.
+            break;
+          case PUBLISH_ONLY:
+            // Send synthesized events to Pubsub in this job.
+            sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(NexmarkUtils.snoop(queryName)),
+                               now);
+            break;
+          case COMBINED:
+            // Send synthesized events to Pubsub in separate publisher job.
+            // We won't start the main pipeline until the publisher has sent the pre-load events.
+            // We'll shutdown the publisher job when we notice the main job has finished.
+            invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() {
+              @Override
+              public void build(Options publishOnlyOptions) {
+                Pipeline sp = Pipeline.create(options);
+                NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
+                publisherMonitor = new Monitor<Event>(queryName, "publisher");
+                sinkEventsToPubsub(
+                    sourceEventsFromSynthetic(sp).apply(publisherMonitor.getTransform()),
+                    now);
+                publisherResult = sp.run();
+              }
+            });
+            break;
+        }
+
+        // Setup the source for the consumer.
+        switch (configuration.pubSubMode) {
+          case PUBLISH_ONLY:
+            // Nothing to consume. Leave source null.
+            break;
+          case SUBSCRIBE_ONLY:
+          case COMBINED:
+            // Read events from pubsub.
+            source = sourceEventsFromPubsub(p, now);
+            break;
+        }
+        break;
+    }
+    return source;
+  }
+
+  private static final TupleTag<String> MAIN = new TupleTag<String>(){};
+  private static final TupleTag<String> SIDE = new TupleTag<String>(){};
+
+  private static class PartitionDoFn extends DoFn<String, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().hashCode() % 2 == 0) {
+        c.output(c.element());
+      } else {
+        c.sideOutput(SIDE, c.element());
+      }
+    }
+  }
+
+  /**
+   * Consume {@code results}.
+   */
+  private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
+    if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
+      // Avoid the cost of formatting the results.
+      results.apply(NexmarkUtils.devNull(queryName));
+      return;
+    }
+
+    PCollection<String> formattedResults = results.apply(NexmarkUtils.format(queryName));
+    if (options.getLogResults()) {
+      formattedResults = formattedResults.apply(NexmarkUtils.<String>log(queryName + ".Results"));
+    }
+
+    switch (configuration.sinkType) {
+      case DEVNULL:
+        // Discard all results
+        formattedResults.apply(NexmarkUtils.devNull(queryName));
+        break;
+      case PUBSUB:
+        sinkResultsToPubsub(formattedResults, now);
+        break;
+      case TEXT:
+        sinkResultsToText(formattedResults, now);
+        break;
+      case AVRO:
+        NexmarkUtils.console(
+            "WARNING: with --sinkType=AVRO, actual query results will be discarded.");
+        break;
+      case BIGQUERY:
+        // Multiple BigQuery backends to mimic what most customers do.
+        PCollectionTuple res = formattedResults.apply(
+            ParDo.named(queryName + ".Partition")
+                 .withOutputTags(MAIN, TupleTagList.of(SIDE))
+                 .of(new PartitionDoFn()));
+        sinkResultsToBigQuery(res.get(MAIN), now, "main");
+        sinkResultsToBigQuery(res.get(SIDE), now, "side");
+        sinkResultsToBigQuery(formattedResults, now, "copy");
+        break;
+      case COUNT_ONLY:
+        // Short-circuited above.
+        throw new RuntimeException();
+    }
+  }
+
+  // ================================================================================
+  // Entry point
+  // ================================================================================
+
+  /**
+   * Calculate the distribution of the expected rate of results per minute (in event time, not
+   * wallclock time).
+   */
+  private void modelResultRates(NexmarkQueryModel model) {
+    List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
+    Collections.sort(counts);
+    int n = counts.size();
+    if (n < 5) {
+      NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n);
+    } else {
+      NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
+                           model.configuration.query, n, counts.get(0), counts.get(n / 4),
+                           counts.get(n / 2),
+                           counts.get(n - 1 - n / 4), counts.get(n - 1));
+    }
+  }
+
+  /**
+   * Run {@code configuration} and return its performance if possible.
+   */
+  @Nullable
+  public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
+    if (options.getMonitorJobs() && !canMonitor()) {
+      throw new RuntimeException("Cannot use --monitorJobs with this runner since it does not "
+                                 + "support monitoring.");
+    }
+    if (options.getManageResources() && !options.getMonitorJobs()) {
+      throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
+    }
+
+    //
+    // Setup per-run state.
+    //
+    Preconditions.checkState(configuration == null);
+    Preconditions.checkState(pubsub == null);
+    Preconditions.checkState(queryName == null);
+    configuration = runConfiguration;
+
+    // GCS URI patterns to delete on exit.
+    List<String> pathsToDelete = new ArrayList<>();
+
+    try {
+      NexmarkUtils.console("Running %s", configuration.toShortString());
+
+      if (configuration.numEvents < 0) {
+        NexmarkUtils.console("skipping since configuration is disabled");
+        return null;
+      }
+
+      List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration),
+                                                 new Query1(configuration),
+                                                 new Query2(configuration),
+                                                 new Query3(configuration),
+                                                 new Query4(configuration),
+                                                 new Query5(configuration),
+                                                 new Query6(configuration),
+                                                 new Query7(configuration),
+                                                 new Query8(configuration),
+                                                 new Query9(configuration),
+                                                 new Query10(configuration),
+                                                 new Query11(configuration),
+                                                 new Query12(configuration));
+      NexmarkQuery query = queries.get(configuration.query);
+      queryName = query.getName();
+
+      List<NexmarkQueryModel> models = Arrays.asList(
+          new Query0Model(configuration),
+          new Query1Model(configuration),
+          new Query2Model(configuration),
+          new Query3Model(configuration),
+          new Query4Model(configuration),
+          new Query5Model(configuration),
+          new Query6Model(configuration),
+          new Query7Model(configuration),
+          new Query8Model(configuration),
+          new Query9Model(configuration),
+          null,
+          null,
+          null);
+      NexmarkQueryModel model = models.get(configuration.query);
+
+      if (options.getJustModelResultRate()) {
+        if (model == null) {
+          throw new RuntimeException(String.format("No model for %s", queryName));
+        }
+        modelResultRates(model);
+        return null;
+      }
+
+      long now = System.currentTimeMillis();
+      Pipeline p = Pipeline.create(options);
+      NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
+
+      // Generate events.
+      PCollection<Event> source = createSource(p, now);
+
+      if (options.getLogEvents()) {
+        source = source.apply(NexmarkUtils.<Event>log(queryName + ".Events"));
+      }
+
+      // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY.
+      // In that case there's nothing more to add to pipeline.
+      if (source != null) {
+        // Optionally sink events in Avro format.
+        // (Query results are ignored).
+        if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
+          sinkEventsToAvro(source);
+        }
+
+        // Special hacks for Query 10 (big logger).
+        if (configuration.query == 10) {
+          String path = null;
+          if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
+            path = logsDir(now);
+          }
+          ((Query10) query).setOutputPath(path);
+          ((Query10) query).setMaxNumWorkers(maxNumWorkers());
+          if (path != null && options.getManageResources()) {
+            pathsToDelete.add(path + "/**");
+          }
+        }
+
+        // Apply query.
+        PCollection<TimestampedValue<KnownSize>> results = source.apply(query);
+
+        if (options.getAssertCorrectness()) {
+          if (model == null) {
+            throw new RuntimeException(String.format("No model for %s", queryName));
+          }
+          // We know all our streams have a finite number of elements.
+          results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+          // If we have a finite number of events then assert our pipeline's
+          // results match those of a model using the same sequence of events.
+          PAssert.that(results).satisfies(model.assertionFor());
+        }
+
+        // Output results.
+        sink(results, now);
+      }
+
+      if (publisherResult != null) {
+        waitForPublisherPreload();
+      }
+      mainResult = p.run();
+      return monitor(query);
+    } finally {
+      //
+      // Cleanup per-run state.
+      //
+      if (pubsub != null) {
+        // Delete any subscriptions and topics we created.
+        pubsub.close();
+        pubsub = null;
+      }
+      configuration = null;
+      queryName = null;
+      // TODO: Cleanup pathsToDelete
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
new file mode 100644
index 0000000..cccaeb1
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A set of {@link NexmarkConfiguration}s.
+ */
+public enum NexmarkSuite {
+  /**
+   * The default.
+   */
+  DEFAULT(defaultConf()),
+
+  /**
+   * Sweep through all 11 queries using the default configuration.
+   * 100k/10k events (depending on query).
+   */
+  SMOKE(smoke()),
+
+  /**
+   * As for SMOKE, but with 10m/1m events.
+   */
+  STRESS(stress()),
+
+  /**
+   * As for SMOKE, but with 1b/100m events.
+   */
+  FULL_THROTTLE(fullThrottle());
+
+  private static List<NexmarkConfiguration> defaultConf() {
+    List<NexmarkConfiguration> configurations = new ArrayList<>();
+    configurations.add(new NexmarkConfiguration());
+    return configurations;
+  }
+
+  private static List<NexmarkConfiguration> smoke() {
+    List<NexmarkConfiguration> configurations = new ArrayList<>();
+    for (int query = 0; query <= 12; query++) {
+      NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.clone();
+      configuration.query = query;
+      configuration.numEvents = 100_000;
+      if (query == 4 || query == 6 || query == 9) {
+        // Scale back so overall runtimes are reasonably close across all queries.
+        configuration.numEvents /= 10;
+      }
+      configurations.add(configuration);
+    }
+    return configurations;
+  }
+
+  private static List<NexmarkConfiguration> stress() {
+    List<NexmarkConfiguration> configurations = smoke();
+    for (NexmarkConfiguration configuration : configurations) {
+      if (configuration.numEvents >= 0) {
+        configuration.numEvents *= 1000;
+      }
+    }
+    return configurations;
+  }
+
+  private static List<NexmarkConfiguration> fullThrottle() {
+    List<NexmarkConfiguration> configurations = smoke();
+    for (NexmarkConfiguration configuration : configurations) {
+      if (configuration.numEvents >= 0) {
+        configuration.numEvents *= 1000;
+      }
+    }
+    return configurations;
+  }
+
+  private final List<NexmarkConfiguration> configurations;
+
+  NexmarkSuite(List<NexmarkConfiguration> configurations) {
+    this.configurations = configurations;
+  }
+
+  /**
+   * Return the configurations corresponding to this suite. We'll override each configuration
+   * with any set command line flags, except for --isStreaming which is only respected for
+   * the {@link #DEFAULT} suite.
+   */
+  public Iterable<NexmarkConfiguration> getConfigurations(Options options) {
+    Set<NexmarkConfiguration> results = new LinkedHashSet<>();
+    for (NexmarkConfiguration configuration : configurations) {
+      NexmarkConfiguration result = configuration.clone();
+      result.overrideFromOptions(options);
+      results.add(result);
+    }
+    return results;
+  }
+}