You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:44 UTC

[36/55] [abbrv] beam git commit: Rename NexmarkDriver to Main and NexmarkRunner to NexmarkLauncher

Rename NexmarkDriver to Main and NexmarkRunner to NexmarkLauncher


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/683680b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/683680b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/683680b1

Branch: refs/heads/master
Commit: 683680b1655e79d696a1d0f4588753a7d8ff2b82
Parents: 77eabba
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 9 10:17:06 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/integration/nexmark/Main.java   |  304 +++++
 .../beam/integration/nexmark/NexmarkDriver.java |  304 -----
 .../integration/nexmark/NexmarkLauncher.java    | 1172 ++++++++++++++++++
 .../beam/integration/nexmark/NexmarkRunner.java | 1172 ------------------
 4 files changed, 1476 insertions(+), 1476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
new file mode 100644
index 0000000..da4d446
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are multiple queries over a three table schema representing an online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of streaming dataflow.
+ *
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+public class Main<OptionT extends NexmarkOptions> {
+
+  /**
+   * Entry point.
+   */
+  void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
+    Instant start = Instant.now();
+    Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
+    Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
+    Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
+
+    boolean successful = true;
+    try {
+      // Run all the configurations.
+      for (NexmarkConfiguration configuration : configurations) {
+        NexmarkPerf perf = nexmarkLauncher.run(configuration);
+        if (perf != null) {
+          if (perf.errors == null || perf.errors.size() > 0) {
+            successful = false;
+          }
+          appendPerf(options.getPerfFilename(), configuration, perf);
+          actual.put(configuration, perf);
+          // Summarize what we've run so far.
+          saveSummary(null, configurations, actual, baseline, start);
+        }
+      }
+    } finally {
+      if (options.getMonitorJobs()) {
+        // Report overall performance.
+        saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
+        saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
+      }
+    }
+
+    if (!successful) {
+      throw new RuntimeException("Execution was not successful");
+    }
+  }
+
+  /**
+   * Append the pair of {@code configuration} and {@code perf} to perf file.
+   */
+  private void appendPerf(
+      @Nullable String perfFilename, NexmarkConfiguration configuration,
+      NexmarkPerf perf) {
+    if (perfFilename == null) {
+      return;
+    }
+    List<String> lines = new ArrayList<>();
+    lines.add("");
+    lines.add(String.format("# %s", Instant.now()));
+    lines.add(String.format("# %s", configuration.toShortString()));
+    lines.add(configuration.toString());
+    lines.add(perf.toString());
+    try {
+      Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+          StandardOpenOption.APPEND);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to write perf file: ", e);
+    }
+    NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+  }
+
+  /**
+   * Load the baseline perf.
+   */
+  @Nullable
+  private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
+      @Nullable String baselineFilename) {
+    if (baselineFilename == null) {
+      return null;
+    }
+    Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
+    List<String> lines;
+    try {
+      lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to read baseline perf file: ", e);
+    }
+    for (int i = 0; i < lines.size(); i++) {
+      if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+        continue;
+      }
+      NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+      NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+      baseline.put(configuration, perf);
+    }
+    NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+        baselineFilename);
+    return baseline;
+  }
+
+  private static final String LINE =
+      "==========================================================================================";
+
+  /**
+   * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
+   */
+  private static void saveSummary(
+      @Nullable String summaryFilename,
+      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+    List<String> lines = new ArrayList<>();
+
+    lines.add("");
+    lines.add(LINE);
+
+    lines.add(
+        String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+    lines.add("");
+
+    lines.add("Default configuration:");
+    lines.add(NexmarkConfiguration.DEFAULT.toString());
+    lines.add("");
+
+    lines.add("Configurations:");
+    lines.add("  Conf  Description");
+    int conf = 0;
+    for (NexmarkConfiguration configuration : configurations) {
+      lines.add(String.format("  %04d  %s", conf++, configuration.toShortString()));
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf != null && actualPerf.jobId != null) {
+        lines.add(String.format("  %4s  [Ran as job %s]", "", actualPerf.jobId));
+      }
+    }
+
+    lines.add("");
+    lines.add("Performance:");
+    lines.add(String.format("  %4s  %12s  %12s  %12s  %12s  %12s  %12s", "Conf", "Runtime(sec)",
+        "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+    conf = 0;
+    for (NexmarkConfiguration configuration : configurations) {
+      String line = String.format("  %04d  ", conf++);
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf == null) {
+        line += "*** not run ***";
+      } else {
+        NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+        double runtimeSec = actualPerf.runtimeSec;
+        line += String.format("%12.1f  ", runtimeSec);
+        if (baselinePerf == null) {
+          line += String.format("%12s  ", "");
+        } else {
+          double baselineRuntimeSec = baselinePerf.runtimeSec;
+          double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+          line += String.format("%+11.2f%%  ", diff);
+        }
+
+        double eventsPerSec = actualPerf.eventsPerSec;
+        line += String.format("%12.1f  ", eventsPerSec);
+        if (baselinePerf == null) {
+          line += String.format("%12s  ", "");
+        } else {
+          double baselineEventsPerSec = baselinePerf.eventsPerSec;
+          double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+          line += String.format("%+11.2f%%  ", diff);
+        }
+
+        long numResults = actualPerf.numResults;
+        line += String.format("%12d  ", numResults);
+        if (baselinePerf == null) {
+          line += String.format("%12s", "");
+        } else {
+          long baselineNumResults = baselinePerf.numResults;
+          long diff = numResults - baselineNumResults;
+          line += String.format("%+12d", diff);
+        }
+      }
+      lines.add(line);
+
+      if (actualPerf != null) {
+        List<String> errors = actualPerf.errors;
+        if (errors == null) {
+          errors = new ArrayList<>();
+          errors.add("NexmarkGoogleRunner returned null errors list");
+        }
+        for (String error : errors) {
+          lines.add(String.format("  %4s  *** %s ***", "", error));
+        }
+      }
+    }
+
+    lines.add(LINE);
+    lines.add("");
+
+    for (String line : lines) {
+      System.out.println(line);
+    }
+
+    if (summaryFilename != null) {
+      try {
+        Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+            StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to save summary file: ", e);
+      }
+      NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+    }
+  }
+
+  /**
+   * Write all perf data and any baselines to a javascript file which can be used by
+   * graphing page etc.
+   */
+  private static void saveJavascript(
+      @Nullable String javascriptFilename,
+      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+    if (javascriptFilename == null) {
+      return;
+    }
+
+    List<String> lines = new ArrayList<>();
+    lines.add(String.format(
+        "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+    lines.add("var all = [");
+
+    for (NexmarkConfiguration configuration : configurations) {
+      lines.add("  {");
+      lines.add(String.format("    config: %s", configuration));
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf != null) {
+        lines.add(String.format("    ,perf: %s", actualPerf));
+      }
+      NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+      if (baselinePerf != null) {
+        lines.add(String.format("    ,baseline: %s", baselinePerf));
+      }
+      lines.add("  },");
+    }
+
+    lines.add("];");
+
+    try {
+      Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+          StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to save javascript file: ", e);
+    }
+    NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+  }
+
+  public static void main(String[] args) {
+    NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
+      .withValidation()
+      .as(NexmarkOptions.class);
+    NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options);
+    new Main<>().runAll(options, nexmarkLauncher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
deleted file mode 100644
index a982a8d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * An implementation of the 'NEXMark queries' for Google Dataflow.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of streaming dataflow.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-public class NexmarkDriver<OptionT extends NexmarkOptions> {
-
-  /**
-   * Entry point.
-   */
-  void runAll(OptionT options, NexmarkRunner runner) {
-    Instant start = Instant.now();
-    Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
-    Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
-    Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
-
-    boolean successful = true;
-    try {
-      // Run all the configurations.
-      for (NexmarkConfiguration configuration : configurations) {
-        NexmarkPerf perf = runner.run(configuration);
-        if (perf != null) {
-          if (perf.errors == null || perf.errors.size() > 0) {
-            successful = false;
-          }
-          appendPerf(options.getPerfFilename(), configuration, perf);
-          actual.put(configuration, perf);
-          // Summarize what we've run so far.
-          saveSummary(null, configurations, actual, baseline, start);
-        }
-      }
-    } finally {
-      if (options.getMonitorJobs()) {
-        // Report overall performance.
-        saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
-        saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
-      }
-    }
-
-    if (!successful) {
-      throw new RuntimeException("Execution was not successful");
-    }
-  }
-
-  /**
-   * Append the pair of {@code configuration} and {@code perf} to perf file.
-   */
-  private void appendPerf(
-      @Nullable String perfFilename, NexmarkConfiguration configuration,
-      NexmarkPerf perf) {
-    if (perfFilename == null) {
-      return;
-    }
-    List<String> lines = new ArrayList<>();
-    lines.add("");
-    lines.add(String.format("# %s", Instant.now()));
-    lines.add(String.format("# %s", configuration.toShortString()));
-    lines.add(configuration.toString());
-    lines.add(perf.toString());
-    try {
-      Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
-          StandardOpenOption.APPEND);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to write perf file: ", e);
-    }
-    NexmarkUtils.console("appended results to perf file %s.", perfFilename);
-  }
-
-  /**
-   * Load the baseline perf.
-   */
-  @Nullable
-  private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
-      @Nullable String baselineFilename) {
-    if (baselineFilename == null) {
-      return null;
-    }
-    Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
-    List<String> lines;
-    try {
-      lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to read baseline perf file: ", e);
-    }
-    for (int i = 0; i < lines.size(); i++) {
-      if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
-        continue;
-      }
-      NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
-      NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
-      baseline.put(configuration, perf);
-    }
-    NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
-        baselineFilename);
-    return baseline;
-  }
-
-  private static final String LINE =
-      "==========================================================================================";
-
-  /**
-   * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
-   */
-  private static void saveSummary(
-      @Nullable String summaryFilename,
-      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
-      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
-    List<String> lines = new ArrayList<>();
-
-    lines.add("");
-    lines.add(LINE);
-
-    lines.add(
-        String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
-    lines.add("");
-
-    lines.add("Default configuration:");
-    lines.add(NexmarkConfiguration.DEFAULT.toString());
-    lines.add("");
-
-    lines.add("Configurations:");
-    lines.add("  Conf  Description");
-    int conf = 0;
-    for (NexmarkConfiguration configuration : configurations) {
-      lines.add(String.format("  %04d  %s", conf++, configuration.toShortString()));
-      NexmarkPerf actualPerf = actual.get(configuration);
-      if (actualPerf != null && actualPerf.jobId != null) {
-        lines.add(String.format("  %4s  [Ran as job %s]", "", actualPerf.jobId));
-      }
-    }
-
-    lines.add("");
-    lines.add("Performance:");
-    lines.add(String.format("  %4s  %12s  %12s  %12s  %12s  %12s  %12s", "Conf", "Runtime(sec)",
-        "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
-    conf = 0;
-    for (NexmarkConfiguration configuration : configurations) {
-      String line = String.format("  %04d  ", conf++);
-      NexmarkPerf actualPerf = actual.get(configuration);
-      if (actualPerf == null) {
-        line += "*** not run ***";
-      } else {
-        NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
-        double runtimeSec = actualPerf.runtimeSec;
-        line += String.format("%12.1f  ", runtimeSec);
-        if (baselinePerf == null) {
-          line += String.format("%12s  ", "");
-        } else {
-          double baselineRuntimeSec = baselinePerf.runtimeSec;
-          double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
-          line += String.format("%+11.2f%%  ", diff);
-        }
-
-        double eventsPerSec = actualPerf.eventsPerSec;
-        line += String.format("%12.1f  ", eventsPerSec);
-        if (baselinePerf == null) {
-          line += String.format("%12s  ", "");
-        } else {
-          double baselineEventsPerSec = baselinePerf.eventsPerSec;
-          double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
-          line += String.format("%+11.2f%%  ", diff);
-        }
-
-        long numResults = actualPerf.numResults;
-        line += String.format("%12d  ", numResults);
-        if (baselinePerf == null) {
-          line += String.format("%12s", "");
-        } else {
-          long baselineNumResults = baselinePerf.numResults;
-          long diff = numResults - baselineNumResults;
-          line += String.format("%+12d", diff);
-        }
-      }
-      lines.add(line);
-
-      if (actualPerf != null) {
-        List<String> errors = actualPerf.errors;
-        if (errors == null) {
-          errors = new ArrayList<>();
-          errors.add("NexmarkGoogleRunner returned null errors list");
-        }
-        for (String error : errors) {
-          lines.add(String.format("  %4s  *** %s ***", "", error));
-        }
-      }
-    }
-
-    lines.add(LINE);
-    lines.add("");
-
-    for (String line : lines) {
-      System.out.println(line);
-    }
-
-    if (summaryFilename != null) {
-      try {
-        Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
-            StandardOpenOption.CREATE, StandardOpenOption.APPEND);
-      } catch (IOException e) {
-        throw new RuntimeException("Unable to save summary file: ", e);
-      }
-      NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
-    }
-  }
-
-  /**
-   * Write all perf data and any baselines to a javascript file which can be used by
-   * graphing page etc.
-   */
-  private static void saveJavascript(
-      @Nullable String javascriptFilename,
-      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
-      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
-    if (javascriptFilename == null) {
-      return;
-    }
-
-    List<String> lines = new ArrayList<>();
-    lines.add(String.format(
-        "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
-    lines.add("var all = [");
-
-    for (NexmarkConfiguration configuration : configurations) {
-      lines.add("  {");
-      lines.add(String.format("    config: %s", configuration));
-      NexmarkPerf actualPerf = actual.get(configuration);
-      if (actualPerf != null) {
-        lines.add(String.format("    ,perf: %s", actualPerf));
-      }
-      NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
-      if (baselinePerf != null) {
-        lines.add(String.format("    ,baseline: %s", baselinePerf));
-      }
-      lines.add("  },");
-    }
-
-    lines.add("];");
-
-    try {
-      Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
-          StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to save javascript file: ", e);
-    }
-    NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
-  }
-
-  public static void main(String[] args) {
-    NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
-      .withValidation()
-      .as(NexmarkOptions.class);
-    NexmarkRunner<NexmarkOptions> runner = new NexmarkRunner<>(options);
-    new NexmarkDriver<>().runAll(options, runner);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
new file mode 100644
index 0000000..ea4ff58
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
@@ -0,0 +1,1172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.queries.NexmarkQuery;
+import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.queries.Query0;
+import org.apache.beam.integration.nexmark.queries.Query0Model;
+import org.apache.beam.integration.nexmark.queries.Query1;
+import org.apache.beam.integration.nexmark.queries.Query10;
+import org.apache.beam.integration.nexmark.queries.Query11;
+import org.apache.beam.integration.nexmark.queries.Query12;
+import org.apache.beam.integration.nexmark.queries.Query1Model;
+import org.apache.beam.integration.nexmark.queries.Query2;
+import org.apache.beam.integration.nexmark.queries.Query2Model;
+import org.apache.beam.integration.nexmark.queries.Query3;
+import org.apache.beam.integration.nexmark.queries.Query3Model;
+import org.apache.beam.integration.nexmark.queries.Query4;
+import org.apache.beam.integration.nexmark.queries.Query4Model;
+import org.apache.beam.integration.nexmark.queries.Query5;
+import org.apache.beam.integration.nexmark.queries.Query5Model;
+import org.apache.beam.integration.nexmark.queries.Query6;
+import org.apache.beam.integration.nexmark.queries.Query6Model;
+import org.apache.beam.integration.nexmark.queries.Query7;
+import org.apache.beam.integration.nexmark.queries.Query7Model;
+import org.apache.beam.integration.nexmark.queries.Query8;
+import org.apache.beam.integration.nexmark.queries.Query8Model;
+import org.apache.beam.integration.nexmark.queries.Query9;
+import org.apache.beam.integration.nexmark.queries.Query9Model;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
+
+/**
+ * Run a single Nexmark query using a given configuration.
+ */
+public class NexmarkLauncher<OptionT extends NexmarkOptions> {
+  /**
+   * Minimum number of samples needed for 'stead-state' rate calculation.
+   */
+  private static final int MIN_SAMPLES = 9;
+  /**
+   * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
+   */
+  private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+  /**
+   * Delay between perf samples.
+   */
+  private static final Duration PERF_DELAY = Duration.standardSeconds(15);
+  /**
+   * How long to let streaming pipeline run after all events have been generated and we've
+   * seen no activity.
+   */
+  private static final Duration DONE_DELAY = Duration.standardMinutes(1);
+  /**
+   * How long to allow no activity without warning.
+   */
+  private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
+  /**
+   * How long to let streaming pipeline run after we've
+   * seen no activity, even if all events have not been generated.
+   */
+  private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
+  /**
+   * NexmarkOptions shared by all runs.
+   */
+  private final OptionT options;
+
+  /**
+   * Which configuration we are running.
+   */
+  @Nullable
+  private NexmarkConfiguration configuration;
+
+  /**
+   * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
+   */
+  @Nullable
+  private Monitor<Event> publisherMonitor;
+
+  /**
+   * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
+   */
+  @Nullable
+  private PipelineResult publisherResult;
+
+  /**
+   * Result for the main pipeline.
+   */
+  @Nullable
+  private PipelineResult mainResult;
+
+  /**
+   * Query name we are running.
+   */
+  @Nullable
+  private String queryName;
+
+  public NexmarkLauncher(OptionT options) {
+    this.options = options;
+  }
+
+
+  /**
+   * Is this query running in streaming mode?
+   */
+  private boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  /**
+   * Return number of cores per worker.
+   */
+  protected int coresPerWorker() {
+    return 4;
+  }
+
+  /**
+   * Return maximum number of workers.
+   */
+  private int maxNumWorkers() {
+    return 5;
+  }
+
+  /**
+   * Return the current value for a long counter, or a default value if can't be retrieved.
+   * Note this uses only attempted metrics because some runners don't support committed metrics.
+   */
+  private long getCounterMetric(PipelineResult result, String namespace, String name,
+    long defaultValue) {
+    //TODO Ismael calc this only once
+    MetricQueryResults metrics = result.metrics().queryMetrics(
+        MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+    Iterable<MetricResult<Long>> counters = metrics.counters();
+    try {
+      MetricResult<Long> metricResult = counters.iterator().next();
+      return metricResult.attempted();
+    } catch (NoSuchElementException e) {
+      //TODO Ismael
+    }
+    return defaultValue;
+  }
+
+  /**
+   * Return the current value for a long counter, or a default value if can't be retrieved.
+   * Note this uses only attempted metrics because some runners don't support committed metrics.
+   */
+  private long getDistributionMetric(PipelineResult result, String namespace, String name,
+      DistributionType distType, long defaultValue) {
+    MetricQueryResults metrics = result.metrics().queryMetrics(
+        MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+    Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
+    try {
+      MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
+      if (distType.equals(DistributionType.MIN)) {
+        return distributionResult.attempted().min();
+      } else if (distType.equals(DistributionType.MAX)) {
+        return distributionResult.attempted().max();
+      } else {
+        //TODO Ismael
+      }
+    } catch (NoSuchElementException e) {
+      //TODO Ismael
+    }
+    return defaultValue;
+  }
+
+  private enum DistributionType {MIN, MAX}
+
+  /**
+   * Return the current value for a time counter, or -1 if can't be retrieved.
+   */
+  private long getTimestampMetric(long now, long value) {
+    //TODO Ismael improve doc
+    if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
+      return -1;
+    }
+    return value;
+  }
+
+  /**
+   * Find a 'steady state' events/sec from {@code snapshots} and
+   * store it in {@code perf} if found.
+   */
+  private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
+    if (!options.isStreaming()) {
+      return;
+    }
+
+    // Find the first sample with actual event and result counts.
+    int dataStart = 0;
+    for (; dataStart < snapshots.size(); dataStart++) {
+      if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
+        break;
+      }
+    }
+
+    // Find the last sample which demonstrated progress.
+    int dataEnd = snapshots.size() - 1;
+    for (; dataEnd > dataStart; dataEnd--) {
+      if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
+        break;
+      }
+    }
+
+    int numSamples = dataEnd - dataStart + 1;
+    if (numSamples < MIN_SAMPLES) {
+      // Not enough samples.
+      NexmarkUtils.console("%d samples not enough to calculate steady-state event rate",
+          numSamples);
+      return;
+    }
+
+    // We'll look at only the middle third samples.
+    int sampleStart = dataStart + numSamples / 3;
+    int sampleEnd = dataEnd - numSamples / 3;
+
+    double sampleSec =
+        snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
+    if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
+      // Not sampled over enough time.
+      NexmarkUtils.console(
+          "sample of %.1f sec not long enough to calculate steady-state event rate",
+          sampleSec);
+      return;
+    }
+
+    // Find rate with least squares error.
+    double sumxx = 0.0;
+    double sumxy = 0.0;
+    long prevNumEvents = -1;
+    for (int i = sampleStart; i <= sampleEnd; i++) {
+      if (prevNumEvents == snapshots.get(i).numEvents) {
+        // Skip samples with no change in number of events since they contribute no data.
+        continue;
+      }
+      // Use the effective runtime instead of wallclock time so we can
+      // insulate ourselves from delays and stutters in the query manager.
+      double x = snapshots.get(i).runtimeSec;
+      prevNumEvents = snapshots.get(i).numEvents;
+      double y = prevNumEvents;
+      sumxx += x * x;
+      sumxy += x * y;
+    }
+    double eventsPerSec = sumxy / sumxx;
+    NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
+    perf.eventsPerSec = eventsPerSec;
+  }
+
+  /**
+   * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
+   */
+  private NexmarkPerf currentPerf(
+      long startMsSinceEpoch, long now, PipelineResult result,
+      List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
+      Monitor<?> resultMonitor) {
+    NexmarkPerf perf = new NexmarkPerf();
+
+    long numEvents =
+      getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1);
+    long numEventBytes =
+      getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1);
+    long eventStart =
+      getTimestampMetric(now,
+        getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime",
+          DistributionType.MIN, -1));
+    long eventEnd =
+      getTimestampMetric(now,
+        getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime",
+          DistributionType.MAX, -1));
+
+    long numResults =
+      getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1);
+    long numResultBytes =
+      getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1);
+    long resultStart =
+      getTimestampMetric(now,
+        getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime",
+          DistributionType.MIN, -1));
+    long resultEnd =
+      getTimestampMetric(now,
+        getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime",
+          DistributionType.MAX, -1));
+    long timestampStart =
+      getTimestampMetric(now,
+        getDistributionMetric(result,
+          resultMonitor.name, resultMonitor.prefix + ".startTimestamp",
+          DistributionType.MIN, -1));
+    long timestampEnd =
+      getTimestampMetric(now,
+        getDistributionMetric(result,
+          resultMonitor.name, resultMonitor.prefix + ".endTimestamp",
+          DistributionType.MAX, -1));
+
+    long effectiveEnd = -1;
+    if (eventEnd >= 0 && resultEnd >= 0) {
+      // It is possible for events to be generated after the last result was emitted.
+      // (Eg Query 2, which only yields results for a small prefix of the event stream.)
+      // So use the max of last event and last result times.
+      effectiveEnd = Math.max(eventEnd, resultEnd);
+    } else if (resultEnd >= 0) {
+      effectiveEnd = resultEnd;
+    } else if (eventEnd >= 0) {
+      // During startup we may have no result yet, but we would still like to track how
+      // long the pipeline has been running.
+      effectiveEnd = eventEnd;
+    }
+
+    if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
+      perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
+    }
+
+    if (numEvents >= 0) {
+      perf.numEvents = numEvents;
+    }
+
+    if (numEvents >= 0 && perf.runtimeSec > 0.0) {
+      // For streaming we may later replace this with a 'steady-state' value calculated
+      // from the progress snapshots.
+      perf.eventsPerSec = numEvents / perf.runtimeSec;
+    }
+
+    if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
+      perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
+    }
+
+    if (numResults >= 0) {
+      perf.numResults = numResults;
+    }
+
+    if (numResults >= 0 && perf.runtimeSec > 0.0) {
+      perf.resultsPerSec = numResults / perf.runtimeSec;
+    }
+
+    if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
+      perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
+    }
+
+    if (eventStart >= 0) {
+      perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
+    }
+
+    if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
+      perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
+    }
+
+    if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
+      double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
+      perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
+    }
+
+    if (resultEnd >= 0) {
+      // Fill in the shutdown delay assuming the job has now finished.
+      perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
+    }
+
+    // As soon as available, try to capture cumulative cost at this point too.
+
+    NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
+    snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
+    snapshot.runtimeSec = perf.runtimeSec;
+    snapshot.numEvents = numEvents;
+    snapshot.numResults = numResults;
+    snapshots.add(snapshot);
+
+    captureSteadyState(perf, snapshots);
+
+    return perf;
+  }
+
+  /**
+   * Build and run a pipeline using specified options.
+   */
+  interface PipelineBuilder<OptionT extends NexmarkOptions> {
+    void build(OptionT publishOnlyOptions);
+  }
+
+  /**
+   * Invoke the builder with options suitable for running a publish-only child pipeline.
+   */
+  private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
+    builder.build(options);
+//    throw new UnsupportedOperationException(
+//        "Cannot use --pubSubMode=COMBINED with DirectRunner");
+  }
+
+  /**
+   * If monitoring, wait until the publisher pipeline has run long enough to establish
+   * a backlog on the Pubsub topic. Otherwise, return immediately.
+   */
+  private void waitForPublisherPreload() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Monitor the performance and progress of a running job. Return final performance if
+   * it was measured.
+   */
+  @Nullable
+  private NexmarkPerf monitor(NexmarkQuery query) {
+    if (!options.getMonitorJobs()) {
+      return null;
+    }
+
+    if (configuration.debug) {
+      NexmarkUtils.console("Waiting for main pipeline to 'finish'");
+    } else {
+      NexmarkUtils.console("--debug=false, so job will not self-cancel");
+    }
+
+    PipelineResult job = mainResult;
+    PipelineResult publisherJob = publisherResult;
+    List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>();
+    long startMsSinceEpoch = System.currentTimeMillis();
+    long endMsSinceEpoch = -1;
+    if (options.getRunningTimeMinutes() != null) {
+      endMsSinceEpoch = startMsSinceEpoch
+                        + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
+                        - Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+    }
+    long lastActivityMsSinceEpoch = -1;
+    NexmarkPerf perf = null;
+    boolean waitingForShutdown = false;
+    boolean publisherCancelled = false;
+    List<String> errors = new ArrayList<>();
+
+    while (true) {
+      long now = System.currentTimeMillis();
+      if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
+        NexmarkUtils.console("Reached end of test, cancelling job");
+        try {
+          job.cancel();
+        } catch (IOException e) {
+          throw new RuntimeException("Unable to cancel main job: ", e);
+        }
+        if (publisherResult != null) {
+          try {
+            publisherJob.cancel();
+          } catch (IOException e) {
+            throw new RuntimeException("Unable to cancel publisher job: ", e);
+          }
+          publisherCancelled = true;
+        }
+        waitingForShutdown = true;
+      }
+
+      PipelineResult.State state = job.getState();
+      NexmarkUtils.console("%s %s%s", state, queryName,
+          waitingForShutdown ? " (waiting for shutdown)" : "");
+
+      NexmarkPerf currPerf;
+      if (configuration.debug) {
+        currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots,
+                               query.eventMonitor, query.resultMonitor);
+      } else {
+        currPerf = null;
+      }
+
+      if (perf == null || perf.anyActivity(currPerf)) {
+        lastActivityMsSinceEpoch = now;
+      }
+
+      if (options.isStreaming() && !waitingForShutdown) {
+        Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
+        long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
+        if (fatalCount > 0) {
+          NexmarkUtils.console("job has fatal errors, cancelling.");
+          errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
+          waitingForShutdown = true;
+        } else if (configuration.debug && configuration.numEvents > 0
+                   && currPerf.numEvents == configuration.numEvents
+                   && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) {
+          NexmarkUtils.console("streaming query appears to have finished, cancelling job.");
+          waitingForShutdown = true;
+        } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
+          NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.");
+          errors.add("Streaming job was cancelled since appeared stuck");
+          waitingForShutdown = true;
+        } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
+          NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.",
+              quietFor.getStandardMinutes());
+          errors.add(
+              String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
+        }
+
+        if (waitingForShutdown) {
+          try {
+            job.cancel();
+          } catch (IOException e) {
+            throw new RuntimeException("Unable to cancel main job: ", e);
+          }
+        }
+      }
+
+      perf = currPerf;
+
+      boolean running = true;
+      switch (state) {
+        case UNKNOWN:
+        case STOPPED:
+        case RUNNING:
+          // Keep going.
+          break;
+        case DONE:
+          // All done.
+          running = false;
+          break;
+        case CANCELLED:
+          running = false;
+          if (!waitingForShutdown) {
+            errors.add("Job was unexpectedly cancelled");
+          }
+          break;
+        case FAILED:
+        case UPDATED:
+          // Abnormal termination.
+          running = false;
+          errors.add("Job was unexpectedly updated");
+          break;
+      }
+
+      if (!running) {
+        break;
+      }
+
+      if (lastActivityMsSinceEpoch == now) {
+        NexmarkUtils.console("new perf %s", perf);
+      } else {
+        NexmarkUtils.console("no activity");
+      }
+
+      try {
+        Thread.sleep(PERF_DELAY.getMillis());
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        NexmarkUtils.console("Interrupted: pipeline is still running");
+      }
+    }
+
+    perf.errors = errors;
+    perf.snapshots = snapshots;
+
+    if (publisherResult != null) {
+      NexmarkUtils.console("Shutting down publisher pipeline.");
+      try {
+        if (!publisherCancelled) {
+          publisherJob.cancel();
+        }
+        publisherJob.waitUntilFinish(Duration.standardMinutes(5));
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to cancel publisher job: ", e);
+      } //TODO Ismael
+//      catch (InterruptedException e) {
+//        Thread.interrupted();
+//        throw new RuntimeException("Interrupted: publish job still running.", e);
+//      }
+    }
+
+    return perf;
+  }
+
+  // ================================================================================
+  // Basic sources and sinks
+  // ================================================================================
+
+  /**
+   * Return a topic name.
+   */
+  private String shortTopic(long now) {
+    String baseTopic = options.getPubsubTopic();
+    if (Strings.isNullOrEmpty(baseTopic)) {
+      throw new RuntimeException("Missing --pubsubTopic");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseTopic;
+      case QUERY:
+        return String.format("%s_%s_source", baseTopic, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s_%s_%d_source", baseTopic, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a subscription name.
+   */
+  private String shortSubscription(long now) {
+    String baseSubscription = options.getPubsubSubscription();
+    if (Strings.isNullOrEmpty(baseSubscription)) {
+      throw new RuntimeException("Missing --pubsubSubscription");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseSubscription;
+      case QUERY:
+        return String.format("%s_%s_source", baseSubscription, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s_%s_%d_source", baseSubscription, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a file name for plain text.
+   */
+  private String textFilename(long now) {
+    String baseFilename = options.getOutputPath();
+    if (Strings.isNullOrEmpty(baseFilename)) {
+      throw new RuntimeException("Missing --outputPath");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseFilename;
+      case QUERY:
+        return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a BigQuery table spec.
+   */
+  private String tableSpec(long now, String version) {
+    String baseTableName = options.getBigQueryTable();
+    if (Strings.isNullOrEmpty(baseTableName)) {
+      throw new RuntimeException("Missing --bigQueryTable");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return String.format("%s:nexmark.%s_%s",
+                             options.getProject(), baseTableName, version);
+      case QUERY:
+        return String.format("%s:nexmark.%s_%s_%s",
+                             options.getProject(), baseTableName, queryName, version);
+      case QUERY_AND_SALT:
+        return String.format("%s:nexmark.%s_%s_%s_%d",
+                             options.getProject(), baseTableName, queryName, version, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a directory for logs.
+   */
+  private String logsDir(long now) {
+    String baseFilename = options.getOutputPath();
+    if (Strings.isNullOrEmpty(baseFilename)) {
+      throw new RuntimeException("Missing --outputPath");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return baseFilename;
+      case QUERY:
+        return String.format("%s/logs_%s", baseFilename, queryName);
+      case QUERY_AND_SALT:
+        return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
+    }
+    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+  }
+
+  /**
+   * Return a source of synthetic events.
+   */
+  private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
+    if (isStreaming()) {
+      NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
+      return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration));
+    } else {
+      NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
+      return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration));
+    }
+  }
+
+  /**
+   * Return source of events from Pubsub.
+   */
+  private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
+    String shortSubscription = shortSubscription(now);
+    NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
+
+    PubsubIO.Read<PubsubMessage> io =
+        PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+    if (!configuration.usePubsubPublishTime) {
+      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+    }
+
+    return p
+      .apply(queryName + ".ReadPubsubEvents", io)
+      .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          byte[] payload = c.element().getPayload();
+          try {
+            Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+            c.output(event);
+          } catch (CoderException e) {
+            // TODO Log decoding Event error
+          }
+        }
+      }));
+  }
+
+  /**
+   * Return Avro source of events from {@code options.getInputFilePrefix}.
+   */
+  private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
+    String filename = options.getInputPath();
+    if (Strings.isNullOrEmpty(filename)) {
+      throw new RuntimeException("Missing --inputPath");
+    }
+    NexmarkUtils.console("Reading events from Avro files at %s", filename);
+    return p
+        .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class)
+                          .from(filename + "*.avro"))
+        .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
+  }
+
+  /**
+   * Send {@code events} to Pubsub.
+   */
+  private void sinkEventsToPubsub(PCollection<Event> events, long now) {
+    String shortTopic = shortTopic(now);
+    NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
+
+    PubsubIO.Write<PubsubMessage> io =
+        PubsubIO.writePubsubMessages().to(shortTopic)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+    if (!configuration.usePubsubPublishTime) {
+      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+    }
+
+    events.apply(queryName + ".EventToPubsubMessage",
+            ParDo.of(new DoFn<Event, PubsubMessage>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                try {
+                  byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+                  c.output(new PubsubMessage(payload, new HashMap<String, String>()));
+                } catch (CoderException e1) {
+                  // TODO Log encoding Event error
+                }
+              }
+            })
+        )
+        .apply(queryName + ".WritePubsubEvents", io);
+  }
+
+  /**
+   * Send {@code formattedResults} to Pubsub.
+   */
+  private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
+    String shortTopic = shortTopic(now);
+    NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
+    PubsubIO.Write<String> io =
+        PubsubIO.writeStrings().to(shortTopic)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+    if (!configuration.usePubsubPublishTime) {
+      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+    }
+    formattedResults.apply(queryName + ".WritePubsubResults", io);
+  }
+
+  /**
+   * Sink all raw Events in {@code source} to {@code options.getOutputPath}.
+   * This will configure the job to write the following files:
+   * <ul>
+   * <li>{@code $outputPath/event*.avro} All Event entities.
+   * <li>{@code $outputPath/auction*.avro} Auction entities.
+   * <li>{@code $outputPath/bid*.avro} Bid entities.
+   * <li>{@code $outputPath/person*.avro} Person entities.
+   * </ul>
+   *
+   * @param source A PCollection of events.
+   */
+  private void sinkEventsToAvro(PCollection<Event> source) {
+    String filename = options.getOutputPath();
+    if (Strings.isNullOrEmpty(filename)) {
+      throw new RuntimeException("Missing --outputPath");
+    }
+    NexmarkUtils.console("Writing events to Avro files at %s", filename);
+    source.apply(queryName + ".WriteAvroEvents",
+            AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
+    source.apply(NexmarkQuery.JUST_BIDS)
+          .apply(queryName + ".WriteAvroBids",
+            AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
+    source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+          .apply(queryName + ".WriteAvroAuctions",
+            AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
+    source.apply(NexmarkQuery.JUST_NEW_PERSONS)
+          .apply(queryName + ".WriteAvroPeople",
+            AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
+  }
+
+  /**
+   * Send {@code formattedResults} to text files.
+   */
+  private void sinkResultsToText(PCollection<String> formattedResults, long now) {
+    String filename = textFilename(now);
+    NexmarkUtils.console("Writing results to text files at %s", filename);
+    formattedResults.apply(queryName + ".WriteTextResults",
+        TextIO.write().to(filename));
+  }
+
+  private static class StringToTableRow extends DoFn<String, TableRow> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      int n = ThreadLocalRandom.current().nextInt(10);
+      List<TableRow> records = new ArrayList<>(n);
+      for (int i = 0; i < n; i++) {
+        records.add(new TableRow().set("index", i).set("value", Integer.toString(i)));
+      }
+      c.output(new TableRow().set("result", c.element()).set("records", records));
+    }
+  }
+
+  /**
+   * Send {@code formattedResults} to BigQuery.
+   */
+  private void sinkResultsToBigQuery(
+      PCollection<String> formattedResults, long now,
+      String version) {
+    String tableSpec = tableSpec(now, version);
+    TableSchema tableSchema =
+        new TableSchema().setFields(ImmutableList.of(
+            new TableFieldSchema().setName("result").setType("STRING"),
+            new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD")
+                                  .setFields(ImmutableList.of(
+                                      new TableFieldSchema().setName("index").setType("INTEGER"),
+                                      new TableFieldSchema().setName("value").setType("STRING")))));
+    NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
+    BigQueryIO.Write io =
+        BigQueryIO.write().to(tableSpec)
+                        .withSchema(tableSchema)
+                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
+    formattedResults
+        .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow()))
+        .apply(queryName + ".WriteBigQueryResults", io);
+  }
+
+  // ================================================================================
+  // Construct overall pipeline
+  // ================================================================================
+
+  /**
+   * Return source of events for this run, or null if we are simply publishing events
+   * to Pubsub.
+   */
+  private PCollection<Event> createSource(Pipeline p, final long now) {
+    PCollection<Event> source = null;
+    switch (configuration.sourceType) {
+      case DIRECT:
+        source = sourceEventsFromSynthetic(p);
+        break;
+      case AVRO:
+        source = sourceEventsFromAvro(p);
+        break;
+      case PUBSUB:
+        // Setup the sink for the publisher.
+        switch (configuration.pubSubMode) {
+          case SUBSCRIBE_ONLY:
+            // Nothing to publish.
+            break;
+          case PUBLISH_ONLY:
+            // Send synthesized events to Pubsub in this job.
+            sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop",
+                    NexmarkUtils.snoop(queryName)), now);
+            break;
+          case COMBINED:
+            // Send synthesized events to Pubsub in separate publisher job.
+            // We won't start the main pipeline until the publisher has sent the pre-load events.
+            // We'll shutdown the publisher job when we notice the main job has finished.
+            invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() {
+              @Override
+              public void build(NexmarkOptions publishOnlyOptions) {
+                Pipeline sp = Pipeline.create(options);
+                NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
+                publisherMonitor = new Monitor<>(queryName, "publisher");
+                sinkEventsToPubsub(
+                    sourceEventsFromSynthetic(sp)
+                            .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
+                    now);
+                publisherResult = sp.run();
+              }
+            });
+            break;
+        }
+
+        // Setup the source for the consumer.
+        switch (configuration.pubSubMode) {
+          case PUBLISH_ONLY:
+            // Nothing to consume. Leave source null.
+            break;
+          case SUBSCRIBE_ONLY:
+          case COMBINED:
+            // Read events from pubsub.
+            source = sourceEventsFromPubsub(p, now);
+            break;
+        }
+        break;
+    }
+    return source;
+  }
+
+  private static final TupleTag<String> MAIN = new TupleTag<String>(){};
+  private static final TupleTag<String> SIDE = new TupleTag<String>(){};
+
+  private static class PartitionDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      if (c.element().hashCode() % 2 == 0) {
+        c.output(c.element());
+      } else {
+        c.output(SIDE, c.element());
+      }
+    }
+  }
+
+  /**
+   * Consume {@code results}.
+   */
+  private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
+    if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
+      // Avoid the cost of formatting the results.
+      results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
+      return;
+    }
+
+    PCollection<String> formattedResults =
+      results.apply(queryName + ".Format", NexmarkUtils.format(queryName));
+    if (options.getLogResults()) {
+      formattedResults = formattedResults.apply(queryName + ".Results.Log",
+              NexmarkUtils.<String>log(queryName + ".Results"));
+    }
+
+    switch (configuration.sinkType) {
+      case DEVNULL:
+        // Discard all results
+        formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
+        break;
+      case PUBSUB:
+        sinkResultsToPubsub(formattedResults, now);
+        break;
+      case TEXT:
+        sinkResultsToText(formattedResults, now);
+        break;
+      case AVRO:
+        NexmarkUtils.console(
+            "WARNING: with --sinkType=AVRO, actual query results will be discarded.");
+        break;
+      case BIGQUERY:
+        // Multiple BigQuery backends to mimic what most customers do.
+        PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
+            ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
+        sinkResultsToBigQuery(res.get(MAIN), now, "main");
+        sinkResultsToBigQuery(res.get(SIDE), now, "side");
+        sinkResultsToBigQuery(formattedResults, now, "copy");
+        break;
+      case COUNT_ONLY:
+        // Short-circuited above.
+        throw new RuntimeException();
+    }
+  }
+
+  // ================================================================================
+  // Entry point
+  // ================================================================================
+
+  /**
+   * Calculate the distribution of the expected rate of results per minute (in event time, not
+   * wallclock time).
+   */
+  private void modelResultRates(NexmarkQueryModel model) {
+    List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
+    Collections.sort(counts);
+    int n = counts.size();
+    if (n < 5) {
+      NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n);
+    } else {
+      NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
+                           model.configuration.query, n, counts.get(0), counts.get(n / 4),
+                           counts.get(n / 2),
+                           counts.get(n - 1 - n / 4), counts.get(n - 1));
+    }
+  }
+
+  /**
+   * Run {@code configuration} and return its performance if possible.
+   */
+  @Nullable
+  public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
+    if (options.getManageResources() && !options.getMonitorJobs()) {
+      throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
+    }
+
+    //
+    // Setup per-run state.
+    //
+    checkState(configuration == null);
+    checkState(queryName == null);
+    configuration = runConfiguration;
+
+    try {
+      NexmarkUtils.console("Running %s", configuration.toShortString());
+
+      if (configuration.numEvents < 0) {
+        NexmarkUtils.console("skipping since configuration is disabled");
+        return null;
+      }
+
+      List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration),
+                                                 new Query1(configuration),
+                                                 new Query2(configuration),
+                                                 new Query3(configuration),
+                                                 new Query4(configuration),
+                                                 new Query5(configuration),
+                                                 new Query6(configuration),
+                                                 new Query7(configuration),
+                                                 new Query8(configuration),
+                                                 new Query9(configuration),
+                                                 new Query10(configuration),
+                                                 new Query11(configuration),
+                                                 new Query12(configuration));
+      NexmarkQuery query = queries.get(configuration.query);
+      queryName = query.getName();
+
+      List<NexmarkQueryModel> models = Arrays.asList(
+          new Query0Model(configuration),
+          new Query1Model(configuration),
+          new Query2Model(configuration),
+          new Query3Model(configuration),
+          new Query4Model(configuration),
+          new Query5Model(configuration),
+          new Query6Model(configuration),
+          new Query7Model(configuration),
+          new Query8Model(configuration),
+          new Query9Model(configuration),
+          null,
+          null,
+          null);
+      NexmarkQueryModel model = models.get(configuration.query);
+
+      if (options.getJustModelResultRate()) {
+        if (model == null) {
+          throw new RuntimeException(String.format("No model for %s", queryName));
+        }
+        modelResultRates(model);
+        return null;
+      }
+
+      long now = System.currentTimeMillis();
+      Pipeline p = Pipeline.create(options);
+      NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
+
+      // Generate events.
+      PCollection<Event> source = createSource(p, now);
+
+      if (options.getLogEvents()) {
+        source = source.apply(queryName + ".Events.Log",
+                NexmarkUtils.<Event>log(queryName + ".Events"));
+      }
+
+      // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY.
+      // In that case there's nothing more to add to pipeline.
+      if (source != null) {
+        // Optionally sink events in Avro format.
+        // (Query results are ignored).
+        if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
+          sinkEventsToAvro(source);
+        }
+
+        // Special hacks for Query 10 (big logger).
+        if (configuration.query == 10) {
+          String path = null;
+          if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
+            path = logsDir(now);
+          }
+          ((Query10) query).setOutputPath(path);
+          ((Query10) query).setMaxNumWorkers(maxNumWorkers());
+        }
+
+        // Apply query.
+        PCollection<TimestampedValue<KnownSize>> results = source.apply(query);
+
+        if (options.getAssertCorrectness()) {
+          if (model == null) {
+            throw new RuntimeException(String.format("No model for %s", queryName));
+          }
+          // We know all our streams have a finite number of elements.
+          results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+          // If we have a finite number of events then assert our pipeline's
+          // results match those of a model using the same sequence of events.
+          PAssert.that(results).satisfies(model.assertionFor());
+        }
+
+        // Output results.
+        sink(results, now);
+      }
+
+      if (publisherResult != null) {
+        waitForPublisherPreload();
+      }
+      mainResult = p.run();
+      mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
+      return monitor(query);
+    } finally {
+      configuration = null;
+      queryName = null;
+    }
+  }
+}