You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/12 23:34:23 UTC
[3/4] incubator-beam git commit: Revise WindowedWordCount for runner
and execution mode portability
Revise WindowedWordCount for runner and execution mode portability
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42595dcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42595dcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42595dcd
Branch: refs/heads/master
Commit: 42595dcd29c248bd3572596c9bb8464d18acd19b
Parents: db41940
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 14:37:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 12 15:23:38 2016 -0800
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 177 +++++++++---------
.../examples/common/WriteWindowedFilesDoFn.java | 77 ++++++++
.../beam/examples/WindowedWordCountIT.java | 182 ++++++++++++++++---
3 files changed, 326 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 4e254bd..5c19454 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -17,26 +17,25 @@
*/
package org.apache.beam.examples;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
-import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +62,8 @@ import org.joda.time.Instant;
* 2. Adding timestamps to data
* 3. Windowing
* 4. Re-using PTransforms over windowed PCollections
- * 5. Writing to BigQuery
+ * 5. Accessing the window of an element
+ * 6. Writing data to per-window text files
* </pre>
*
* <p>By default, the examples will run with the {@code DirectRunner}.
@@ -74,25 +74,23 @@ import org.joda.time.Instant;
* </pre>
* See examples/java/README.md for instructions about how to configure different runners.
*
- * <p>Optionally specify the input file path via:
- * {@code --inputFile=gs://INPUT_PATH},
- * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}.
+ * <p>To execute this pipeline locally, specify a local output file (if using the
+ * {@code DirectRunner}) or output prefix on a supported distributed file system.
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+ * }</pre>
*
- * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
- * specify the table, one will be created for you using the job name. If you don't specify the
- * dataset, a dataset called {@code beam_examples} must already exist in your project.
- * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*
* <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can
* change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
* for 10-minute windows.
*
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
+ * <p>The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C).
*/
public class WindowedWordCount {
- static final int WINDOW_SIZE = 1; // Default window duration in minutes
-
+ static final int WINDOW_SIZE = 10; // Default window duration in minutes
/**
* Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
* this example, for the bounded data case.
@@ -102,18 +100,22 @@ public class WindowedWordCount {
* 2-hour period.
*/
static class AddTimestampFn extends DoFn<String, String> {
- private static final Duration RAND_RANGE = Duration.standardHours(2);
+ private static final Duration RAND_RANGE = Duration.standardHours(1);
private final Instant minTimestamp;
+ private final Instant maxTimestamp;
- AddTimestampFn() {
- this.minTimestamp = new Instant(System.currentTimeMillis());
+ AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
+ this.minTimestamp = minTimestamp;
+ this.maxTimestamp = maxTimestamp;
}
@ProcessElement
public void processElement(ProcessContext c) {
- // Generate a timestamp that falls somewhere in the past two hours.
- long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
- Instant randomTimestamp = minTimestamp.plus(randMillis);
+ Instant randomTimestamp =
+ new Instant(
+ ThreadLocalRandom.current()
+ .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
+
/**
* Concept #2: Set the data element with that timestamp.
*/
@@ -121,50 +123,29 @@ public class WindowedWordCount {
}
}
- /** A DoFn that converts a Word and Count into a BigQuery table row. */
- static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- TableRow row = new TableRow()
- .set("word", c.element().getKey())
- .set("count", c.element().getValue())
- // include a field for the window timestamp
- .set("window_timestamp", c.timestamp().toString());
- c.output(row);
+ /** A {@link DefaultValueFactory} that returns the current system time. */
+ public static class DefaultToCurrentSystemTime implements DefaultValueFactory<Long> {
+ @Override
+ public Long create(PipelineOptions options) {
+ return System.currentTimeMillis();
}
}
- /**
- * Helper method that defines the BigQuery schema used for the output.
- */
- private static TableSchema getSchema() {
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("word").setType("STRING"));
- fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
- TableSchema schema = new TableSchema().setFields(fields);
- return schema;
- }
-
- /**
- * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one
- * that supports both bounded and unbounded data. This is a helper method that creates a
- * TableReference from input options, to tell the pipeline where to write its BigQuery results.
- */
- private static TableReference getTableReference(Options options) {
- TableReference tableRef = new TableReference();
- tableRef.setProjectId(options.getProject());
- tableRef.setDatasetId(options.getBigQueryDataset());
- tableRef.setTableId(options.getBigQueryTable());
- return tableRef;
+ /** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */
+ public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
+ @Override
+ public Long create(PipelineOptions options) {
+ return options.as(Options.class).getMinTimestampMillis()
+ + Duration.standardHours(1).getMillis();
+ }
}
/**
- * Options supported by {@link WindowedWordCount}.
+ * Options for {@link WindowedWordCount}.
*
- * <p>Inherits standard example configuration options, which allow specification of the BigQuery
- * table, as well as the {@link WordCount.WordCountOptions} support for
- * specification of the input file.
+ * <p>Inherits standard example configuration options, which allow specification of the
+ * runner, as well as the {@link WordCount.WordCountOptions} support for
+ * specification of the input and output files.
*/
public interface Options extends WordCount.WordCountOptions,
ExampleOptions, ExampleBigQueryTableOptions {
@@ -172,14 +153,24 @@ public class WindowedWordCount {
@Default.Integer(WINDOW_SIZE)
Integer getWindowSize();
void setWindowSize(Integer value);
+
+ @Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch")
+ @Default.InstanceFactory(DefaultToCurrentSystemTime.class)
+ Long getMinTimestampMillis();
+ void setMinTimestampMillis(Long value);
+
+ @Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch")
+ @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
+ Long getMaxTimestampMillis();
+ void setMaxTimestampMillis(Long value);
}
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- options.setBigQuerySchema(getSchema());
- // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline.
- ExampleUtils exampleUtils = new ExampleUtils(options);
- exampleUtils.setup();
+ final String output = options.getOutput();
+ final Duration windowSize = Duration.standardMinutes(options.getWindowSize());
+ final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
+ final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
Pipeline pipeline = Pipeline.create(options);
@@ -192,7 +183,7 @@ public class WindowedWordCount {
.apply(TextIO.Read.from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
- .apply(ParDo.of(new AddTimestampFn()));
+ .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
/**
* Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
@@ -200,9 +191,10 @@ public class WindowedWordCount {
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
- PCollection<String> windowedWords = input
- .apply(Window.<String>into(
- FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
+ PCollection<String> windowedWords =
+ input.apply(
+ Window.<String>into(
+ FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/**
* Concept #4: Re-use our existing CountWords transform that does not have knowledge of
@@ -211,19 +203,40 @@ public class WindowedWordCount {
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
- * Concept #5: Format the results for a BigQuery table, then write to BigQuery.
- * The BigQuery output source supports both bounded and unbounded data.
+ * Concept #5: Customize the output format using windowing information
+ *
+ * <p>At this point, the data is organized by window. We're writing text files and and have no
+ * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get
+ * one output file per window. (if we had late data this key would not be unique)
+ *
+ * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will
+ * be automatically detected and populated with the window for the current element.
*/
- wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
- .apply(BigQueryIO.Write
- .to(getTableReference(options))
- .withSchema(getSchema())
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
+ PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
+ wordCounts.apply(
+ ParDo.of(
+ new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() {
+ @ProcessElement
+ public void processElement(ProcessContext context, IntervalWindow window) {
+ context.output(KV.of(window, context.element()));
+ }
+ }));
- PipelineResult result = pipeline.run();
+ /**
+ * Concept #6: Format the results and write to a sharded file partitioned by window, using a
+ * simple ParDo operation. Because there may be failures followed by retries, the
+ * writes must be idempotent, but the details of writing to files is elided here.
+ */
+ keyedByWindow
+ .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
+ .apply(ParDo.of(new WriteWindowedFilesDoFn(output)));
- // ExampleUtils will try to cancel the pipeline before the program exists.
- exampleUtils.waitToFinish(result);
+ PipelineResult result = pipeline.run();
+ try {
+ result.waitUntilFinish();
+ } catch (Exception exc) {
+ result.cancel();
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
new file mode 100644
index 0000000..cd6baad
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+/**
+ * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
+ * and upper bounds of their key (an {@link IntervalWindow}).
+ *
+ * <p>This is test utility code, not for end-users, so examples can be focused
+ * on their primary lessons.
+ */
+public class WriteWindowedFilesDoFn
+ extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {
+
+ static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
+ static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
+
+ private final String output;
+
+ public WriteWindowedFilesDoFn(String output) {
+ this.output = output;
+ }
+
+ @VisibleForTesting
+ public static String fileForWindow(String output, IntervalWindow window) {
+ return String.format(
+ "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ // Build a file name from the window
+ IntervalWindow window = context.element().getKey();
+ String outputShard = fileForWindow(output, window);
+
+ // Open the file and write all the values
+ IOChannelFactory factory = IOChannelUtils.getFactory(outputShard);
+ OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain"));
+ for (KV<String, Long> wordCount : context.element().getValue()) {
+ STRING_CODER.encode(
+ wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER);
+ out.write(NEWLINE);
+ }
+ out.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 5d77dd5..e4570ac 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -17,37 +17,59 @@
*/
package org.apache.beam.examples;
-import java.io.IOException;
+import static org.hamcrest.Matchers.equalTo;
+
+import com.google.api.client.util.Sleeper;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.Collections;
import java.util.Date;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.testing.BigqueryMatcher;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.StreamingIT;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.ExplicitShardedFile;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.ShardedFile;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- * End-to-end integration test of {@link WindowedWordCount}.
- */
+/** End-to-end integration test of {@link WindowedWordCount}. */
@RunWith(JUnit4.class)
public class WindowedWordCountIT {
private static final String DEFAULT_INPUT =
"gs://apache-beam-samples/shakespeare/winterstale-personae";
- private static final String DEFAULT_OUTPUT_CHECKSUM = "cd5b52939257e12428a9fa085c32a84dd209b180";
+ static final int MAX_READ_RETRIES = 4;
+ static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+ static final FluentBackoff BACK_OFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+ .withMaxRetries(MAX_READ_RETRIES);
- /**
- * Options for the {@link WindowedWordCount} Integration Test.
- */
+ /** Options for the {@link WindowedWordCount} Integration Test. */
public interface WindowedWordCountITOptions
- extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions {
- }
+ extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions {}
@BeforeClass
public static void setUp() {
@@ -55,36 +77,140 @@ public class WindowedWordCountIT {
}
@Test
- public void testWindowedWordCountInBatch() throws IOException {
- testWindowedWordCountPipeline(false /* isStreaming */);
+ public void testWindowedWordCountInBatch() throws Exception {
+ testWindowedWordCountPipeline(defaultOptions());
}
@Test
@Category(StreamingIT.class)
- public void testWindowedWordCountInStreaming() throws IOException {
- testWindowedWordCountPipeline(true /* isStreaming */);
+ public void testWindowedWordCountInStreaming() throws Exception {
+ testWindowedWordCountPipeline(streamingOptions());
}
- private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException {
+ private WindowedWordCountITOptions defaultOptions() throws Exception {
WindowedWordCountITOptions options =
TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
- options.setStreaming(isStreaming);
options.setInputFile(DEFAULT_INPUT);
+ options.setTestTimeoutSeconds(1200L);
+
+ options.setMinTimestampMillis(0L);
+ options.setMinTimestampMillis(Duration.standardHours(1).getMillis());
+ options.setWindowSize(10);
+
+ options.setOutput(
+ IOChannelUtils.resolve(
+ options.getTempRoot(),
+ String.format("WindowedWordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
+ "output",
+ "results"));
+ return options;
+ }
+
+ private WindowedWordCountITOptions streamingOptions() throws Exception {
+ WindowedWordCountITOptions options = defaultOptions();
+ options.setStreaming(true);
+ return options;
+ }
+
+ private WindowedWordCountITOptions batchOptions() throws Exception {
+ WindowedWordCountITOptions options = defaultOptions();
+ // This is the default value, but make it explicit
+ options.setStreaming(false);
+ return options;
+ }
+
+ private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception {
+
+ String outputPrefix = options.getOutput();
+
+ List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+ for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
+ Instant windowStart =
+ new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
+ expectedOutputFiles.add(
+ WriteWindowedFilesDoFn.fileForWindow(
+ outputPrefix,
+ new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10)))));
+ }
- // Note: currently unused because the example writes to BigQuery, but WindowedWordCount.Options
- // are tightly coupled to WordCount.Options, where the option is required.
- options.setOutput(IOChannelUtils.resolve(
- options.getTempRoot(),
- String.format("WindowedWordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
- "output",
- "results"));
+ ShardedFile inputFile =
+ new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
+
+ // For this integration test, input is tiny and we can build the expected counts
+ SortedMap<String, Long> expectedWordCounts = new TreeMap<>();
+ for (String line :
+ inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
+ String[] words = line.split("[^a-zA-Z']+");
+
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ expectedWordCounts.put(word,
+ MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L);
+ }
+ }
+ }
- String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word",
- options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable());
options.setOnSuccessMatcher(
- new BigqueryMatcher(
- options.getAppName(), options.getProject(), query, DEFAULT_OUTPUT_CHECKSUM));
+ new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles)));
WindowedWordCount.main(TestPipeline.convertToArgs(options));
}
+
+ /**
+ * A matcher that bakes in expected word counts, so they can be read directly via some other
+ * mechanism, and compares a sharded output file with the result.
+ */
+ private static class WordCountsMatcher extends TypeSafeMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
+
+ private final SortedMap<String, Long> expectedWordCounts;
+ private final ShardedFile outputFile;
+ private SortedMap<String, Long> actualCounts;
+
+ public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, ShardedFile outputFile) {
+ this.expectedWordCounts = expectedWordCounts;
+ this.outputFile = outputFile;
+ }
+
+ @Override
+ public boolean matchesSafely(PipelineResult pipelineResult) {
+ try {
+ // Load output data
+ List<String> lines =
+ outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+
+ // Since the windowing is nondeterministic we only check the sums
+ actualCounts = new TreeMap<>();
+ for (String line : lines) {
+ String[] splits = line.split(": ");
+ String word = splits[0];
+ long count = Long.parseLong(splits[1]);
+
+ Long current = actualCounts.get(word);
+ if (current == null) {
+ actualCounts.put(word, count);
+ } else {
+ actualCounts.put(word, current + count);
+ }
+ }
+
+ return actualCounts.equals(expectedWordCounts);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to read from sharded output: %s", outputFile));
+ }
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ equalTo(expectedWordCounts).describeTo(description);
+ }
+
+ @Override
+ public void describeMismatchSafely(PipelineResult pResult, Description description) {
+ equalTo(expectedWordCounts).describeMismatch(actualCounts, description);
+ }
+ }
}