You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:44 UTC
[36/55] [abbrv] beam git commit: Rename NexmarkDriver to Main and
NexmarkRunner to NexmarkLauncher
Rename NexmarkDriver to Main and NexmarkRunner to NexmarkLauncher
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/683680b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/683680b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/683680b1
Branch: refs/heads/master
Commit: 683680b1655e79d696a1d0f4588753a7d8ff2b82
Parents: 77eabba
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 9 10:17:06 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
.../apache/beam/integration/nexmark/Main.java | 304 +++++
.../beam/integration/nexmark/NexmarkDriver.java | 304 -----
.../integration/nexmark/NexmarkLauncher.java | 1172 ++++++++++++++++++
.../beam/integration/nexmark/NexmarkRunner.java | 1172 ------------------
4 files changed, 1476 insertions(+), 1476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
new file mode 100644
index 0000000..da4d446
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are multiple queries over a three table schema representing an online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of streaming dataflow.
+ *
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+public class Main<OptionT extends NexmarkOptions> {
+
+ /**
+ * Entry point.
+ */
+ void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
+ Instant start = Instant.now();
+ Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
+ Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
+ Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
+
+ boolean successful = true;
+ try {
+ // Run all the configurations.
+ for (NexmarkConfiguration configuration : configurations) {
+ NexmarkPerf perf = nexmarkLauncher.run(configuration);
+ if (perf != null) {
+ if (perf.errors == null || perf.errors.size() > 0) {
+ successful = false;
+ }
+ appendPerf(options.getPerfFilename(), configuration, perf);
+ actual.put(configuration, perf);
+ // Summarize what we've run so far.
+ saveSummary(null, configurations, actual, baseline, start);
+ }
+ }
+ } finally {
+ if (options.getMonitorJobs()) {
+ // Report overall performance.
+ saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
+ saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
+ }
+ }
+
+ if (!successful) {
+ throw new RuntimeException("Execution was not successful");
+ }
+ }
+
+ /**
+ * Append the pair of {@code configuration} and {@code perf} to perf file.
+ */
+ private void appendPerf(
+ @Nullable String perfFilename, NexmarkConfiguration configuration,
+ NexmarkPerf perf) {
+ if (perfFilename == null) {
+ return;
+ }
+ List<String> lines = new ArrayList<>();
+ lines.add("");
+ lines.add(String.format("# %s", Instant.now()));
+ lines.add(String.format("# %s", configuration.toShortString()));
+ lines.add(configuration.toString());
+ lines.add(perf.toString());
+ try {
+ Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+ StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to write perf file: ", e);
+ }
+ NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+ }
+
+ /**
+ * Load the baseline perf.
+ */
+ @Nullable
+ private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
+ @Nullable String baselineFilename) {
+ if (baselineFilename == null) {
+ return null;
+ }
+ Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
+ List<String> lines;
+ try {
+ lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read baseline perf file: ", e);
+ }
+ for (int i = 0; i < lines.size(); i++) {
+ if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+ continue;
+ }
+ NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+ NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+ baseline.put(configuration, perf);
+ }
+ NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+ baselineFilename);
+ return baseline;
+ }
+
+ private static final String LINE =
+ "==========================================================================================";
+
+ /**
+ * Print summary of {@code actual} vs (if non-null) {@code baseline}.
+ */
+ private static void saveSummary(
+ @Nullable String summaryFilename,
+ Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+ @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+ List<String> lines = new ArrayList<>();
+
+ lines.add("");
+ lines.add(LINE);
+
+ lines.add(
+ String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("");
+
+ lines.add("Default configuration:");
+ lines.add(NexmarkConfiguration.DEFAULT.toString());
+ lines.add("");
+
+ lines.add("Configurations:");
+ lines.add(" Conf Description");
+ int conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null && actualPerf.jobId != null) {
+ lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
+ }
+ }
+
+ lines.add("");
+ lines.add("Performance:");
+ lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)",
+ "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+ conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ String line = String.format(" %04d ", conf++);
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf == null) {
+ line += "*** not run ***";
+ } else {
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ double runtimeSec = actualPerf.runtimeSec;
+ line += String.format("%12.1f ", runtimeSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineRuntimeSec = baselinePerf.runtimeSec;
+ double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ double eventsPerSec = actualPerf.eventsPerSec;
+ line += String.format("%12.1f ", eventsPerSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineEventsPerSec = baselinePerf.eventsPerSec;
+ double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ long numResults = actualPerf.numResults;
+ line += String.format("%12d ", numResults);
+ if (baselinePerf == null) {
+ line += String.format("%12s", "");
+ } else {
+ long baselineNumResults = baselinePerf.numResults;
+ long diff = numResults - baselineNumResults;
+ line += String.format("%+12d", diff);
+ }
+ }
+ lines.add(line);
+
+ if (actualPerf != null) {
+ List<String> errors = actualPerf.errors;
+ if (errors == null) {
+ errors = new ArrayList<>();
+ errors.add("NexmarkGoogleRunner returned null errors list");
+ }
+ for (String error : errors) {
+ lines.add(String.format(" %4s *** %s ***", "", error));
+ }
+ }
+ }
+
+ lines.add(LINE);
+ lines.add("");
+
+ for (String line : lines) {
+ System.out.println(line);
+ }
+
+ if (summaryFilename != null) {
+ try {
+ Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save summary file: ", e);
+ }
+ NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+ }
+ }
+
+ /**
+ * Write all perf data and any baselines to a javascript file which can be used by
+ * graphing page etc.
+ */
+ private static void saveJavascript(
+ @Nullable String javascriptFilename,
+ Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+ @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+ if (javascriptFilename == null) {
+ return;
+ }
+
+ List<String> lines = new ArrayList<>();
+ lines.add(String.format(
+ "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("var all = [");
+
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(" {");
+ lines.add(String.format(" config: %s", configuration));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null) {
+ lines.add(String.format(" ,perf: %s", actualPerf));
+ }
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ if (baselinePerf != null) {
+ lines.add(String.format(" ,baseline: %s", baselinePerf));
+ }
+ lines.add(" },");
+ }
+
+ lines.add("];");
+
+ try {
+ Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save javascript file: ", e);
+ }
+ NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+ }
+
+ public static void main(String[] args) {
+ NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkOptions.class);
+ NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options);
+ new Main<>().runAll(options, nexmarkLauncher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
deleted file mode 100644
index a982a8d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * An implementation of the 'NEXMark queries' for Google Dataflow.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of streaming dataflow.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-public class NexmarkDriver<OptionT extends NexmarkOptions> {
-
- /**
- * Entry point.
- */
- void runAll(OptionT options, NexmarkRunner runner) {
- Instant start = Instant.now();
- Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
- Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
- Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
-
- boolean successful = true;
- try {
- // Run all the configurations.
- for (NexmarkConfiguration configuration : configurations) {
- NexmarkPerf perf = runner.run(configuration);
- if (perf != null) {
- if (perf.errors == null || perf.errors.size() > 0) {
- successful = false;
- }
- appendPerf(options.getPerfFilename(), configuration, perf);
- actual.put(configuration, perf);
- // Summarize what we've run so far.
- saveSummary(null, configurations, actual, baseline, start);
- }
- }
- } finally {
- if (options.getMonitorJobs()) {
- // Report overall performance.
- saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
- saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
- }
- }
-
- if (!successful) {
- throw new RuntimeException("Execution was not successful");
- }
- }
-
- /**
- * Append the pair of {@code configuration} and {@code perf} to perf file.
- */
- private void appendPerf(
- @Nullable String perfFilename, NexmarkConfiguration configuration,
- NexmarkPerf perf) {
- if (perfFilename == null) {
- return;
- }
- List<String> lines = new ArrayList<>();
- lines.add("");
- lines.add(String.format("# %s", Instant.now()));
- lines.add(String.format("# %s", configuration.toShortString()));
- lines.add(configuration.toString());
- lines.add(perf.toString());
- try {
- Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
- StandardOpenOption.APPEND);
- } catch (IOException e) {
- throw new RuntimeException("Unable to write perf file: ", e);
- }
- NexmarkUtils.console("appended results to perf file %s.", perfFilename);
- }
-
- /**
- * Load the baseline perf.
- */
- @Nullable
- private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
- @Nullable String baselineFilename) {
- if (baselineFilename == null) {
- return null;
- }
- Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
- List<String> lines;
- try {
- lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read baseline perf file: ", e);
- }
- for (int i = 0; i < lines.size(); i++) {
- if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
- continue;
- }
- NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
- NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
- baseline.put(configuration, perf);
- }
- NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
- baselineFilename);
- return baseline;
- }
-
- private static final String LINE =
- "==========================================================================================";
-
- /**
- * Print summary of {@code actual} vs (if non-null) {@code baseline}.
- */
- private static void saveSummary(
- @Nullable String summaryFilename,
- Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
- @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
- List<String> lines = new ArrayList<>();
-
- lines.add("");
- lines.add(LINE);
-
- lines.add(
- String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
- lines.add("");
-
- lines.add("Default configuration:");
- lines.add(NexmarkConfiguration.DEFAULT.toString());
- lines.add("");
-
- lines.add("Configurations:");
- lines.add(" Conf Description");
- int conf = 0;
- for (NexmarkConfiguration configuration : configurations) {
- lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
- NexmarkPerf actualPerf = actual.get(configuration);
- if (actualPerf != null && actualPerf.jobId != null) {
- lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
- }
- }
-
- lines.add("");
- lines.add("Performance:");
- lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)",
- "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
- conf = 0;
- for (NexmarkConfiguration configuration : configurations) {
- String line = String.format(" %04d ", conf++);
- NexmarkPerf actualPerf = actual.get(configuration);
- if (actualPerf == null) {
- line += "*** not run ***";
- } else {
- NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
- double runtimeSec = actualPerf.runtimeSec;
- line += String.format("%12.1f ", runtimeSec);
- if (baselinePerf == null) {
- line += String.format("%12s ", "");
- } else {
- double baselineRuntimeSec = baselinePerf.runtimeSec;
- double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
- line += String.format("%+11.2f%% ", diff);
- }
-
- double eventsPerSec = actualPerf.eventsPerSec;
- line += String.format("%12.1f ", eventsPerSec);
- if (baselinePerf == null) {
- line += String.format("%12s ", "");
- } else {
- double baselineEventsPerSec = baselinePerf.eventsPerSec;
- double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
- line += String.format("%+11.2f%% ", diff);
- }
-
- long numResults = actualPerf.numResults;
- line += String.format("%12d ", numResults);
- if (baselinePerf == null) {
- line += String.format("%12s", "");
- } else {
- long baselineNumResults = baselinePerf.numResults;
- long diff = numResults - baselineNumResults;
- line += String.format("%+12d", diff);
- }
- }
- lines.add(line);
-
- if (actualPerf != null) {
- List<String> errors = actualPerf.errors;
- if (errors == null) {
- errors = new ArrayList<>();
- errors.add("NexmarkGoogleRunner returned null errors list");
- }
- for (String error : errors) {
- lines.add(String.format(" %4s *** %s ***", "", error));
- }
- }
- }
-
- lines.add(LINE);
- lines.add("");
-
- for (String line : lines) {
- System.out.println(line);
- }
-
- if (summaryFilename != null) {
- try {
- Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
- StandardOpenOption.CREATE, StandardOpenOption.APPEND);
- } catch (IOException e) {
- throw new RuntimeException("Unable to save summary file: ", e);
- }
- NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
- }
- }
-
- /**
- * Write all perf data and any baselines to a javascript file which can be used by
- * graphing page etc.
- */
- private static void saveJavascript(
- @Nullable String javascriptFilename,
- Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
- @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
- if (javascriptFilename == null) {
- return;
- }
-
- List<String> lines = new ArrayList<>();
- lines.add(String.format(
- "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
- lines.add("var all = [");
-
- for (NexmarkConfiguration configuration : configurations) {
- lines.add(" {");
- lines.add(String.format(" config: %s", configuration));
- NexmarkPerf actualPerf = actual.get(configuration);
- if (actualPerf != null) {
- lines.add(String.format(" ,perf: %s", actualPerf));
- }
- NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
- if (baselinePerf != null) {
- lines.add(String.format(" ,baseline: %s", baselinePerf));
- }
- lines.add(" },");
- }
-
- lines.add("];");
-
- try {
- Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
- StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
- } catch (IOException e) {
- throw new RuntimeException("Unable to save javascript file: ", e);
- }
- NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
- }
-
- public static void main(String[] args) {
- NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkOptions.class);
- NexmarkRunner<NexmarkOptions> runner = new NexmarkRunner<>(options);
- new NexmarkDriver<>().runAll(options, runner);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
new file mode 100644
index 0000000..ea4ff58
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
@@ -0,0 +1,1172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.queries.NexmarkQuery;
+import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.queries.Query0;
+import org.apache.beam.integration.nexmark.queries.Query0Model;
+import org.apache.beam.integration.nexmark.queries.Query1;
+import org.apache.beam.integration.nexmark.queries.Query10;
+import org.apache.beam.integration.nexmark.queries.Query11;
+import org.apache.beam.integration.nexmark.queries.Query12;
+import org.apache.beam.integration.nexmark.queries.Query1Model;
+import org.apache.beam.integration.nexmark.queries.Query2;
+import org.apache.beam.integration.nexmark.queries.Query2Model;
+import org.apache.beam.integration.nexmark.queries.Query3;
+import org.apache.beam.integration.nexmark.queries.Query3Model;
+import org.apache.beam.integration.nexmark.queries.Query4;
+import org.apache.beam.integration.nexmark.queries.Query4Model;
+import org.apache.beam.integration.nexmark.queries.Query5;
+import org.apache.beam.integration.nexmark.queries.Query5Model;
+import org.apache.beam.integration.nexmark.queries.Query6;
+import org.apache.beam.integration.nexmark.queries.Query6Model;
+import org.apache.beam.integration.nexmark.queries.Query7;
+import org.apache.beam.integration.nexmark.queries.Query7Model;
+import org.apache.beam.integration.nexmark.queries.Query8;
+import org.apache.beam.integration.nexmark.queries.Query8Model;
+import org.apache.beam.integration.nexmark.queries.Query9;
+import org.apache.beam.integration.nexmark.queries.Query9Model;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
+
+/**
+ * Run a single Nexmark query using a given configuration.
+ */
+public class NexmarkLauncher<OptionT extends NexmarkOptions> {
+ /**
+ * Minimum number of samples needed for 'stead-state' rate calculation.
+ */
+ private static final int MIN_SAMPLES = 9;
+ /**
+ * Minimum length of time over which to consider samples for 'steady-state' rate calculation.
+ */
+ private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+ /**
+ * Delay between perf samples.
+ */
+ private static final Duration PERF_DELAY = Duration.standardSeconds(15);
+ /**
+ * How long to let streaming pipeline run after all events have been generated and we've
+ * seen no activity.
+ */
+ private static final Duration DONE_DELAY = Duration.standardMinutes(1);
+ /**
+ * How long to allow no activity without warning.
+ */
+ private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
+ /**
+ * How long to let streaming pipeline run after we've
+ * seen no activity, even if all events have not been generated.
+ */
+ private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
+ /**
+ * NexmarkOptions shared by all runs.
+ */
+ private final OptionT options;
+
+ /**
+ * Which configuration we are running.
+ */
+ @Nullable
+ private NexmarkConfiguration configuration;
+
+ /**
+ * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
+ */
+ @Nullable
+ private Monitor<Event> publisherMonitor;
+
+ /**
+ * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
+ */
+ @Nullable
+ private PipelineResult publisherResult;
+
+ /**
+ * Result for the main pipeline.
+ */
+ @Nullable
+ private PipelineResult mainResult;
+
+ /**
+ * Query name we are running.
+ */
+ @Nullable
+ private String queryName;
+
+ public NexmarkLauncher(OptionT options) {
+ this.options = options;
+ }
+
+
+ /**
+ * Is this query running in streaming mode?
+ */
+ private boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ /**
+ * Return number of cores per worker.
+ */
+ protected int coresPerWorker() {
+ return 4;
+ }
+
+ /**
+ * Return maximum number of workers.
+ */
+ private int maxNumWorkers() {
+ return 5;
+ }
+
+ /**
+ * Return the current value for a long counter, or a default value if can't be retrieved.
+ * Note this uses only attempted metrics because some runners don't support committed metrics.
+ */
+ private long getCounterMetric(PipelineResult result, String namespace, String name,
+ long defaultValue) {
+ //TODO Ismael calc this only once
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+ Iterable<MetricResult<Long>> counters = metrics.counters();
+ try {
+ MetricResult<Long> metricResult = counters.iterator().next();
+ return metricResult.attempted();
+ } catch (NoSuchElementException e) {
+ //TODO Ismael
+ }
+ return defaultValue;
+ }
+
+ /**
+ * Return the current value for a long counter, or a default value if can't be retrieved.
+ * Note this uses only attempted metrics because some runners don't support committed metrics.
+ */
+ private long getDistributionMetric(PipelineResult result, String namespace, String name,
+ DistributionType distType, long defaultValue) {
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+ Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
+ try {
+ MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
+ if (distType.equals(DistributionType.MIN)) {
+ return distributionResult.attempted().min();
+ } else if (distType.equals(DistributionType.MAX)) {
+ return distributionResult.attempted().max();
+ } else {
+ //TODO Ismael
+ }
+ } catch (NoSuchElementException e) {
+ //TODO Ismael
+ }
+ return defaultValue;
+ }
+
+ private enum DistributionType {MIN, MAX}
+
+ /**
+ * Return the current value for a time counter, or -1 if can't be retrieved.
+ */
+ private long getTimestampMetric(long now, long value) {
+ //TODO Ismael improve doc
+ if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
+ return -1;
+ }
+ return value;
+ }
+
+ /**
+ * Find a 'steady state' events/sec from {@code snapshots} and
+ * store it in {@code perf} if found.
+ */
+ private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
+ if (!options.isStreaming()) {
+ return;
+ }
+
+ // Find the first sample with actual event and result counts.
+ int dataStart = 0;
+ for (; dataStart < snapshots.size(); dataStart++) {
+ if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
+ break;
+ }
+ }
+
+ // Find the last sample which demonstrated progress.
+ int dataEnd = snapshots.size() - 1;
+ for (; dataEnd > dataStart; dataEnd--) {
+ if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
+ break;
+ }
+ }
+
+ int numSamples = dataEnd - dataStart + 1;
+ if (numSamples < MIN_SAMPLES) {
+ // Not enough samples.
+ NexmarkUtils.console("%d samples not enough to calculate steady-state event rate",
+ numSamples);
+ return;
+ }
+
+ // We'll look at only the middle third samples.
+ int sampleStart = dataStart + numSamples / 3;
+ int sampleEnd = dataEnd - numSamples / 3;
+
+ double sampleSec =
+ snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
+ if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
+ // Not sampled over enough time.
+ NexmarkUtils.console(
+ "sample of %.1f sec not long enough to calculate steady-state event rate",
+ sampleSec);
+ return;
+ }
+
+ // Find rate with least squares error.
+ double sumxx = 0.0;
+ double sumxy = 0.0;
+ long prevNumEvents = -1;
+ for (int i = sampleStart; i <= sampleEnd; i++) {
+ if (prevNumEvents == snapshots.get(i).numEvents) {
+ // Skip samples with no change in number of events since they contribute no data.
+ continue;
+ }
+ // Use the effective runtime instead of wallclock time so we can
+ // insulate ourselves from delays and stutters in the query manager.
+ double x = snapshots.get(i).runtimeSec;
+ prevNumEvents = snapshots.get(i).numEvents;
+ double y = prevNumEvents;
+ sumxx += x * x;
+ sumxy += x * y;
+ }
+ double eventsPerSec = sumxy / sumxx;
+ NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
+ perf.eventsPerSec = eventsPerSec;
+ }
+
+ /**
+ * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
+ */
+ private NexmarkPerf currentPerf(
+ long startMsSinceEpoch, long now, PipelineResult result,
+ List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
+ Monitor<?> resultMonitor) {
+ NexmarkPerf perf = new NexmarkPerf();
+
+ long numEvents =
+ getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1);
+ long numEventBytes =
+ getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1);
+ long eventStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime",
+ DistributionType.MIN, -1));
+ long eventEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime",
+ DistributionType.MAX, -1));
+
+ long numResults =
+ getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1);
+ long numResultBytes =
+ getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1);
+ long resultStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime",
+ DistributionType.MIN, -1));
+ long resultEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime",
+ DistributionType.MAX, -1));
+ long timestampStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result,
+ resultMonitor.name, resultMonitor.prefix + ".startTimestamp",
+ DistributionType.MIN, -1));
+ long timestampEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result,
+ resultMonitor.name, resultMonitor.prefix + ".endTimestamp",
+ DistributionType.MAX, -1));
+
+ long effectiveEnd = -1;
+ if (eventEnd >= 0 && resultEnd >= 0) {
+ // It is possible for events to be generated after the last result was emitted.
+ // (Eg Query 2, which only yields results for a small prefix of the event stream.)
+ // So use the max of last event and last result times.
+ effectiveEnd = Math.max(eventEnd, resultEnd);
+ } else if (resultEnd >= 0) {
+ effectiveEnd = resultEnd;
+ } else if (eventEnd >= 0) {
+ // During startup we may have no result yet, but we would still like to track how
+ // long the pipeline has been running.
+ effectiveEnd = eventEnd;
+ }
+
+ if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
+ perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
+ }
+
+ if (numEvents >= 0) {
+ perf.numEvents = numEvents;
+ }
+
+ if (numEvents >= 0 && perf.runtimeSec > 0.0) {
+ // For streaming we may later replace this with a 'steady-state' value calculated
+ // from the progress snapshots.
+ perf.eventsPerSec = numEvents / perf.runtimeSec;
+ }
+
+ if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
+ perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
+ }
+
+ if (numResults >= 0) {
+ perf.numResults = numResults;
+ }
+
+ if (numResults >= 0 && perf.runtimeSec > 0.0) {
+ perf.resultsPerSec = numResults / perf.runtimeSec;
+ }
+
+ if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
+ perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
+ }
+
+ if (eventStart >= 0) {
+ perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
+ }
+
+ if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
+ perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
+ }
+
+ if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
+ double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
+ perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
+ }
+
+ if (resultEnd >= 0) {
+ // Fill in the shutdown delay assuming the job has now finished.
+ perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
+ }
+
+ // As soon as available, try to capture cumulative cost at this point too.
+
+ NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
+ snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
+ snapshot.runtimeSec = perf.runtimeSec;
+ snapshot.numEvents = numEvents;
+ snapshot.numResults = numResults;
+ snapshots.add(snapshot);
+
+ captureSteadyState(perf, snapshots);
+
+ return perf;
+ }
+
+ /**
+ * Build and run a pipeline using specified options.
+ */
+ interface PipelineBuilder<OptionT extends NexmarkOptions> {
+ void build(OptionT publishOnlyOptions);
+ }
+
+ /**
+ * Invoke the builder with options suitable for running a publish-only child pipeline.
+ */
+ private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
+ builder.build(options);
+// throw new UnsupportedOperationException(
+// "Cannot use --pubSubMode=COMBINED with DirectRunner");
+ }
+
+ /**
+ * If monitoring, wait until the publisher pipeline has run long enough to establish
+ * a backlog on the Pubsub topic. Otherwise, return immediately.
+ */
+ private void waitForPublisherPreload() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Monitor the performance and progress of a running job. Return final performance if
+ * it was measured.
+ */
+ @Nullable
+ private NexmarkPerf monitor(NexmarkQuery query) {
+ if (!options.getMonitorJobs()) {
+ return null;
+ }
+
+ if (configuration.debug) {
+ NexmarkUtils.console("Waiting for main pipeline to 'finish'");
+ } else {
+ NexmarkUtils.console("--debug=false, so job will not self-cancel");
+ }
+
+ PipelineResult job = mainResult;
+ PipelineResult publisherJob = publisherResult;
+ List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>();
+ long startMsSinceEpoch = System.currentTimeMillis();
+ long endMsSinceEpoch = -1;
+ if (options.getRunningTimeMinutes() != null) {
+ endMsSinceEpoch = startMsSinceEpoch
+ + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
+ - Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+ }
+ long lastActivityMsSinceEpoch = -1;
+ NexmarkPerf perf = null;
+ boolean waitingForShutdown = false;
+ boolean publisherCancelled = false;
+ List<String> errors = new ArrayList<>();
+
+ while (true) {
+ long now = System.currentTimeMillis();
+ if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
+ NexmarkUtils.console("Reached end of test, cancelling job");
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel main job: ", e);
+ }
+ if (publisherResult != null) {
+ try {
+ publisherJob.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel publisher job: ", e);
+ }
+ publisherCancelled = true;
+ }
+ waitingForShutdown = true;
+ }
+
+ PipelineResult.State state = job.getState();
+ NexmarkUtils.console("%s %s%s", state, queryName,
+ waitingForShutdown ? " (waiting for shutdown)" : "");
+
+ NexmarkPerf currPerf;
+ if (configuration.debug) {
+ currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots,
+ query.eventMonitor, query.resultMonitor);
+ } else {
+ currPerf = null;
+ }
+
+ if (perf == null || perf.anyActivity(currPerf)) {
+ lastActivityMsSinceEpoch = now;
+ }
+
+ if (options.isStreaming() && !waitingForShutdown) {
+ Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
+ long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
+ if (fatalCount > 0) {
+ NexmarkUtils.console("job has fatal errors, cancelling.");
+ errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
+ waitingForShutdown = true;
+ } else if (configuration.debug && configuration.numEvents > 0
+ && currPerf.numEvents == configuration.numEvents
+ && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) {
+ NexmarkUtils.console("streaming query appears to have finished, cancelling job.");
+ waitingForShutdown = true;
+ } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
+ NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.");
+ errors.add("Streaming job was cancelled since appeared stuck");
+ waitingForShutdown = true;
+ } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
+ NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.",
+ quietFor.getStandardMinutes());
+ errors.add(
+ String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
+ }
+
+ if (waitingForShutdown) {
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel main job: ", e);
+ }
+ }
+ }
+
+ perf = currPerf;
+
+ boolean running = true;
+ switch (state) {
+ case UNKNOWN:
+ case STOPPED:
+ case RUNNING:
+ // Keep going.
+ break;
+ case DONE:
+ // All done.
+ running = false;
+ break;
+ case CANCELLED:
+ running = false;
+ if (!waitingForShutdown) {
+ errors.add("Job was unexpectedly cancelled");
+ }
+ break;
+ case FAILED:
+ case UPDATED:
+ // Abnormal termination.
+ running = false;
+ errors.add("Job was unexpectedly updated");
+ break;
+ }
+
+ if (!running) {
+ break;
+ }
+
+ if (lastActivityMsSinceEpoch == now) {
+ NexmarkUtils.console("new perf %s", perf);
+ } else {
+ NexmarkUtils.console("no activity");
+ }
+
+ try {
+ Thread.sleep(PERF_DELAY.getMillis());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ NexmarkUtils.console("Interrupted: pipeline is still running");
+ }
+ }
+
+ perf.errors = errors;
+ perf.snapshots = snapshots;
+
+ if (publisherResult != null) {
+ NexmarkUtils.console("Shutting down publisher pipeline.");
+ try {
+ if (!publisherCancelled) {
+ publisherJob.cancel();
+ }
+ publisherJob.waitUntilFinish(Duration.standardMinutes(5));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to cancel publisher job: ", e);
+ } //TODO Ismael
+// catch (InterruptedException e) {
+// Thread.interrupted();
+// throw new RuntimeException("Interrupted: publish job still running.", e);
+// }
+ }
+
+ return perf;
+ }
+
+ // ================================================================================
+ // Basic sources and sinks
+ // ================================================================================
+
+ /**
+ * Return a topic name.
+ */
+ private String shortTopic(long now) {
+ String baseTopic = options.getPubsubTopic();
+ if (Strings.isNullOrEmpty(baseTopic)) {
+ throw new RuntimeException("Missing --pubsubTopic");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseTopic;
+ case QUERY:
+ return String.format("%s_%s_source", baseTopic, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s_%s_%d_source", baseTopic, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a subscription name.
+ */
+ private String shortSubscription(long now) {
+ String baseSubscription = options.getPubsubSubscription();
+ if (Strings.isNullOrEmpty(baseSubscription)) {
+ throw new RuntimeException("Missing --pubsubSubscription");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseSubscription;
+ case QUERY:
+ return String.format("%s_%s_source", baseSubscription, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s_%s_%d_source", baseSubscription, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a file name for plain text.
+ */
+ private String textFilename(long now) {
+ String baseFilename = options.getOutputPath();
+ if (Strings.isNullOrEmpty(baseFilename)) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseFilename;
+ case QUERY:
+ return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a BigQuery table spec.
+ */
+ private String tableSpec(long now, String version) {
+ String baseTableName = options.getBigQueryTable();
+ if (Strings.isNullOrEmpty(baseTableName)) {
+ throw new RuntimeException("Missing --bigQueryTable");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return String.format("%s:nexmark.%s_%s",
+ options.getProject(), baseTableName, version);
+ case QUERY:
+ return String.format("%s:nexmark.%s_%s_%s",
+ options.getProject(), baseTableName, queryName, version);
+ case QUERY_AND_SALT:
+ return String.format("%s:nexmark.%s_%s_%s_%d",
+ options.getProject(), baseTableName, queryName, version, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a directory for logs.
+ */
+ private String logsDir(long now) {
+ String baseFilename = options.getOutputPath();
+ if (Strings.isNullOrEmpty(baseFilename)) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return baseFilename;
+ case QUERY:
+ return String.format("%s/logs_%s", baseFilename, queryName);
+ case QUERY_AND_SALT:
+ return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
+ }
+ throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
+ }
+
+ /**
+ * Return a source of synthetic events.
+ */
+ private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
+ if (isStreaming()) {
+ NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
+ return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration));
+ } else {
+ NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
+ return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration));
+ }
+ }
+
+ /**
+ * Return source of events from Pubsub.
+ */
+ private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
+ String shortSubscription = shortSubscription(now);
+ NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
+
+ PubsubIO.Read<PubsubMessage> io =
+ PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+
+ return p
+ .apply(queryName + ".ReadPubsubEvents", io)
+ .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ byte[] payload = c.element().getPayload();
+ try {
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+ c.output(event);
+ } catch (CoderException e) {
+ // TODO Log decoding Event error
+ }
+ }
+ }));
+ }
+
+ /**
+ * Return Avro source of events from {@code options.getInputFilePrefix}.
+ */
+ private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
+ String filename = options.getInputPath();
+ if (Strings.isNullOrEmpty(filename)) {
+ throw new RuntimeException("Missing --inputPath");
+ }
+ NexmarkUtils.console("Reading events from Avro files at %s", filename);
+ return p
+ .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class)
+ .from(filename + "*.avro"))
+ .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
+ }
+
+ /**
+ * Send {@code events} to Pubsub.
+ */
+ private void sinkEventsToPubsub(PCollection<Event> events, long now) {
+ String shortTopic = shortTopic(now);
+ NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
+
+ PubsubIO.Write<PubsubMessage> io =
+ PubsubIO.writePubsubMessages().to(shortTopic)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+
+ events.apply(queryName + ".EventToPubsubMessage",
+ ParDo.of(new DoFn<Event, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ try {
+ byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(new PubsubMessage(payload, new HashMap<String, String>()));
+ } catch (CoderException e1) {
+ // TODO Log encoding Event error
+ }
+ }
+ })
+ )
+ .apply(queryName + ".WritePubsubEvents", io);
+ }
+
+ /**
+ * Send {@code formattedResults} to Pubsub.
+ */
+ private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
+ String shortTopic = shortTopic(now);
+ NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
+ PubsubIO.Write<String> io =
+ PubsubIO.writeStrings().to(shortTopic)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
+ if (!configuration.usePubsubPublishTime) {
+ io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
+ }
+ formattedResults.apply(queryName + ".WritePubsubResults", io);
+ }
+
+ /**
+ * Sink all raw Events in {@code source} to {@code options.getOutputPath}.
+ * This will configure the job to write the following files:
+ * <ul>
+ * <li>{@code $outputPath/event*.avro} All Event entities.
+ * <li>{@code $outputPath/auction*.avro} Auction entities.
+ * <li>{@code $outputPath/bid*.avro} Bid entities.
+ * <li>{@code $outputPath/person*.avro} Person entities.
+ * </ul>
+ *
+ * @param source A PCollection of events.
+ */
+ private void sinkEventsToAvro(PCollection<Event> source) {
+ String filename = options.getOutputPath();
+ if (Strings.isNullOrEmpty(filename)) {
+ throw new RuntimeException("Missing --outputPath");
+ }
+ NexmarkUtils.console("Writing events to Avro files at %s", filename);
+ source.apply(queryName + ".WriteAvroEvents",
+ AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
+ source.apply(NexmarkQuery.JUST_BIDS)
+ .apply(queryName + ".WriteAvroBids",
+ AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
+ source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+ .apply(queryName + ".WriteAvroAuctions",
+ AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
+ source.apply(NexmarkQuery.JUST_NEW_PERSONS)
+ .apply(queryName + ".WriteAvroPeople",
+ AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
+ }
+
+ /**
+ * Send {@code formattedResults} to text files.
+ */
+ private void sinkResultsToText(PCollection<String> formattedResults, long now) {
+ String filename = textFilename(now);
+ NexmarkUtils.console("Writing results to text files at %s", filename);
+ formattedResults.apply(queryName + ".WriteTextResults",
+ TextIO.write().to(filename));
+ }
+
+ private static class StringToTableRow extends DoFn<String, TableRow> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ int n = ThreadLocalRandom.current().nextInt(10);
+ List<TableRow> records = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ records.add(new TableRow().set("index", i).set("value", Integer.toString(i)));
+ }
+ c.output(new TableRow().set("result", c.element()).set("records", records));
+ }
+ }
+
+ /**
+ * Send {@code formattedResults} to BigQuery.
+ */
+ private void sinkResultsToBigQuery(
+ PCollection<String> formattedResults, long now,
+ String version) {
+ String tableSpec = tableSpec(now, version);
+ TableSchema tableSchema =
+ new TableSchema().setFields(ImmutableList.of(
+ new TableFieldSchema().setName("result").setType("STRING"),
+ new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD")
+ .setFields(ImmutableList.of(
+ new TableFieldSchema().setName("index").setType("INTEGER"),
+ new TableFieldSchema().setName("value").setType("STRING")))));
+ NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
+ BigQueryIO.Write io =
+ BigQueryIO.write().to(tableSpec)
+ .withSchema(tableSchema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
+ formattedResults
+ .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow()))
+ .apply(queryName + ".WriteBigQueryResults", io);
+ }
+
+ // ================================================================================
+ // Construct overall pipeline
+ // ================================================================================
+
+ /**
+ * Return source of events for this run, or null if we are simply publishing events
+ * to Pubsub.
+ */
+ private PCollection<Event> createSource(Pipeline p, final long now) {
+ PCollection<Event> source = null;
+ switch (configuration.sourceType) {
+ case DIRECT:
+ source = sourceEventsFromSynthetic(p);
+ break;
+ case AVRO:
+ source = sourceEventsFromAvro(p);
+ break;
+ case PUBSUB:
+ // Setup the sink for the publisher.
+ switch (configuration.pubSubMode) {
+ case SUBSCRIBE_ONLY:
+ // Nothing to publish.
+ break;
+ case PUBLISH_ONLY:
+ // Send synthesized events to Pubsub in this job.
+ sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop",
+ NexmarkUtils.snoop(queryName)), now);
+ break;
+ case COMBINED:
+ // Send synthesized events to Pubsub in separate publisher job.
+ // We won't start the main pipeline until the publisher has sent the pre-load events.
+ // We'll shutdown the publisher job when we notice the main job has finished.
+ invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() {
+ @Override
+ public void build(NexmarkOptions publishOnlyOptions) {
+ Pipeline sp = Pipeline.create(options);
+ NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
+ publisherMonitor = new Monitor<>(queryName, "publisher");
+ sinkEventsToPubsub(
+ sourceEventsFromSynthetic(sp)
+ .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
+ now);
+ publisherResult = sp.run();
+ }
+ });
+ break;
+ }
+
+ // Setup the source for the consumer.
+ switch (configuration.pubSubMode) {
+ case PUBLISH_ONLY:
+ // Nothing to consume. Leave source null.
+ break;
+ case SUBSCRIBE_ONLY:
+ case COMBINED:
+ // Read events from pubsub.
+ source = sourceEventsFromPubsub(p, now);
+ break;
+ }
+ break;
+ }
+ return source;
+ }
+
+ private static final TupleTag<String> MAIN = new TupleTag<String>(){};
+ private static final TupleTag<String> SIDE = new TupleTag<String>(){};
+
+ private static class PartitionDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (c.element().hashCode() % 2 == 0) {
+ c.output(c.element());
+ } else {
+ c.output(SIDE, c.element());
+ }
+ }
+ }
+
+ /**
+ * Consume {@code results}.
+ */
+ private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
+ if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
+ // Avoid the cost of formatting the results.
+ results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
+ return;
+ }
+
+ PCollection<String> formattedResults =
+ results.apply(queryName + ".Format", NexmarkUtils.format(queryName));
+ if (options.getLogResults()) {
+ formattedResults = formattedResults.apply(queryName + ".Results.Log",
+ NexmarkUtils.<String>log(queryName + ".Results"));
+ }
+
+ switch (configuration.sinkType) {
+ case DEVNULL:
+ // Discard all results
+ formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
+ break;
+ case PUBSUB:
+ sinkResultsToPubsub(formattedResults, now);
+ break;
+ case TEXT:
+ sinkResultsToText(formattedResults, now);
+ break;
+ case AVRO:
+ NexmarkUtils.console(
+ "WARNING: with --sinkType=AVRO, actual query results will be discarded.");
+ break;
+ case BIGQUERY:
+ // Multiple BigQuery backends to mimic what most customers do.
+ PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
+ ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
+ sinkResultsToBigQuery(res.get(MAIN), now, "main");
+ sinkResultsToBigQuery(res.get(SIDE), now, "side");
+ sinkResultsToBigQuery(formattedResults, now, "copy");
+ break;
+ case COUNT_ONLY:
+ // Short-circuited above.
+ throw new RuntimeException();
+ }
+ }
+
+ // ================================================================================
+ // Entry point
+ // ================================================================================
+
+ /**
+ * Calculate the distribution of the expected rate of results per minute (in event time, not
+ * wallclock time).
+ */
+ private void modelResultRates(NexmarkQueryModel model) {
+ List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
+ Collections.sort(counts);
+ int n = counts.size();
+ if (n < 5) {
+ NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n);
+ } else {
+ NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
+ model.configuration.query, n, counts.get(0), counts.get(n / 4),
+ counts.get(n / 2),
+ counts.get(n - 1 - n / 4), counts.get(n - 1));
+ }
+ }
+
+ /**
+ * Run {@code configuration} and return its performance if possible.
+ */
+ @Nullable
+ public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
+ if (options.getManageResources() && !options.getMonitorJobs()) {
+ throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
+ }
+
+ //
+ // Setup per-run state.
+ //
+ checkState(configuration == null);
+ checkState(queryName == null);
+ configuration = runConfiguration;
+
+ try {
+ NexmarkUtils.console("Running %s", configuration.toShortString());
+
+ if (configuration.numEvents < 0) {
+ NexmarkUtils.console("skipping since configuration is disabled");
+ return null;
+ }
+
+ List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration),
+ new Query1(configuration),
+ new Query2(configuration),
+ new Query3(configuration),
+ new Query4(configuration),
+ new Query5(configuration),
+ new Query6(configuration),
+ new Query7(configuration),
+ new Query8(configuration),
+ new Query9(configuration),
+ new Query10(configuration),
+ new Query11(configuration),
+ new Query12(configuration));
+ NexmarkQuery query = queries.get(configuration.query);
+ queryName = query.getName();
+
+ List<NexmarkQueryModel> models = Arrays.asList(
+ new Query0Model(configuration),
+ new Query1Model(configuration),
+ new Query2Model(configuration),
+ new Query3Model(configuration),
+ new Query4Model(configuration),
+ new Query5Model(configuration),
+ new Query6Model(configuration),
+ new Query7Model(configuration),
+ new Query8Model(configuration),
+ new Query9Model(configuration),
+ null,
+ null,
+ null);
+ NexmarkQueryModel model = models.get(configuration.query);
+
+ if (options.getJustModelResultRate()) {
+ if (model == null) {
+ throw new RuntimeException(String.format("No model for %s", queryName));
+ }
+ modelResultRates(model);
+ return null;
+ }
+
+ long now = System.currentTimeMillis();
+ Pipeline p = Pipeline.create(options);
+ NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
+
+ // Generate events.
+ PCollection<Event> source = createSource(p, now);
+
+ if (options.getLogEvents()) {
+ source = source.apply(queryName + ".Events.Log",
+ NexmarkUtils.<Event>log(queryName + ".Events"));
+ }
+
+ // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY.
+ // In that case there's nothing more to add to pipeline.
+ if (source != null) {
+ // Optionally sink events in Avro format.
+ // (Query results are ignored).
+ if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
+ sinkEventsToAvro(source);
+ }
+
+ // Special hacks for Query 10 (big logger).
+ if (configuration.query == 10) {
+ String path = null;
+ if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
+ path = logsDir(now);
+ }
+ ((Query10) query).setOutputPath(path);
+ ((Query10) query).setMaxNumWorkers(maxNumWorkers());
+ }
+
+ // Apply query.
+ PCollection<TimestampedValue<KnownSize>> results = source.apply(query);
+
+ if (options.getAssertCorrectness()) {
+ if (model == null) {
+ throw new RuntimeException(String.format("No model for %s", queryName));
+ }
+ // We know all our streams have a finite number of elements.
+ results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+ // If we have a finite number of events then assert our pipeline's
+ // results match those of a model using the same sequence of events.
+ PAssert.that(results).satisfies(model.assertionFor());
+ }
+
+ // Output results.
+ sink(results, now);
+ }
+
+ if (publisherResult != null) {
+ waitForPublisherPreload();
+ }
+ mainResult = p.run();
+ mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
+ return monitor(query);
+ } finally {
+ configuration = null;
+ queryName = null;
+ }
+ }
+}