You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/12 23:34:21 UTC

[1/4] incubator-beam git commit: Factor out ShardedFile from FileChecksumMatcher

Repository: incubator-beam
Updated Branches:
  refs/heads/master 59f1fb26a -> d9657ffc3


Factor out ShardedFile from FileChecksumMatcher


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/db41940f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/db41940f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/db41940f

Branch: refs/heads/master
Commit: db41940f977bf3315ea7e5460d188d8f9b4fa119
Parents: 9678b1c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Dec 5 14:32:12 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 12 15:12:05 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/testing/FileChecksumMatcher.java   | 114 ++--------
 .../beam/sdk/util/ExplicitShardedFile.java      | 120 ++++++++++
 .../beam/sdk/util/NumberedShardedFile.java      | 220 +++++++++++++++++++
 .../org/apache/beam/sdk/util/ShardedFile.java   |  42 ++++
 .../sdk/testing/FileChecksumMatcherTest.java    |  77 -------
 .../beam/sdk/util/NumberedShardedFileTest.java  | 181 +++++++++++++++
 6 files changed, 581 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index 4b249fe..82a6b71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -21,31 +21,19 @@ package org.apache.beam.sdk.testing;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
-import com.google.common.io.CharStreams;
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.apache.beam.sdk.util.ShardedFile;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
 import org.joda.time.Duration;
@@ -83,9 +71,8 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
       Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
 
   private final String expectedChecksum;
-  private final String filePath;
-  private final Pattern shardTemplate;
   private String actualChecksum;
+  private final ShardedFile shardedFile;
 
   /**
    * Constructor that uses default shard template.
@@ -98,7 +85,7 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
   }
 
   /**
-   * Constructor.
+   * Constructor using a custom shard template.
    *
    * @param checksum expected checksum string used to verify file content.
    * @param filePath path of files that's to be verified.
@@ -121,8 +108,17 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
         DEFAULT_SHARD_TEMPLATE);
 
     this.expectedChecksum = checksum;
-    this.filePath = filePath;
-    this.shardTemplate = shardTemplate;
+    this.shardedFile = new NumberedShardedFile(filePath, shardTemplate);
+  }
+
+  /**
+   * Constructor using an entirely custom {@link ShardedFile} implementation.
+   *
+   * <p>For internal use only.
+   */
+  public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) {
+    this.expectedChecksum = expectedChecksum;
+    this.shardedFile = shardedFile;
   }
 
   @Override
@@ -130,9 +126,10 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
     // Load output data
     List<String> outputs;
     try {
-      outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+      outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
     } catch (Exception e) {
-      throw new RuntimeException(String.format("Failed to read from: %s", filePath), e);
+      throw new RuntimeException(
+          String.format("Failed to read from: %s", shardedFile), e);
     }
 
     // Verify outputs. Checksum is computed using SHA-1 algorithm
@@ -142,81 +139,6 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
     return actualChecksum.equals(expectedChecksum);
   }
 
-  @VisibleForTesting
-  List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
-      throws IOException, InterruptedException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(filePath);
-    IOException lastException = null;
-
-    do {
-      try {
-        // Match inputPath which may contains glob
-        Collection<String> files = factory.match(filePath);
-        LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath);
-
-        if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
-          continue;
-        }
-
-        // Read data from file paths
-        return readLines(files, factory);
-      } catch (IOException e) {
-        // Ignore and retry
-        lastException = e;
-        LOG.warn("Error in file reading. Ignore and retry.");
-      }
-    } while(BackOffUtils.next(sleeper, backOff));
-    // Failed after max retries
-    throw new IOException(
-        String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
-        lastException);
-  }
-
-  @VisibleForTesting
-  List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
-    List<String> allLines = Lists.newArrayList();
-    int i = 1;
-    for (String file : files) {
-      try (Reader reader =
-               Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
-        List<String> lines = CharStreams.readLines(reader);
-        allLines.addAll(lines);
-        LOG.debug(
-            "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
-      }
-      i++;
-    }
-    return allLines;
-  }
-
-  /**
-   * Check if total number of files is correct by comparing with the number that
-   * is parsed from shard name using a name template. If no template is specified,
-   * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total
-   * number of files.
-   *
-   * @return {@code true} if at least one shard name matches template and total number
-   * of given files equals the number that is parsed from shard name.
-   */
-  @VisibleForTesting
-  boolean checkTotalNumOfFiles(Collection<String> files) {
-    for (String filePath : files) {
-      Path fileName = Paths.get(filePath).getFileName();
-      if (fileName == null) {
-        // this path has zero elements
-        continue;
-      }
-      Matcher matcher = shardTemplate.matcher(fileName.toString());
-      if (!matcher.matches()) {
-        // shard name doesn't match the pattern, check with the next shard
-        continue;
-      }
-      // once match, extract total number of shards and compare to file list
-      return files.size() == Integer.parseInt(matcher.group("numshards"));
-    }
-    return false;
-  }
-
   private String computeHash(@Nonnull List<String> strs) {
     if (strs.isEmpty()) {
       return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
new file mode 100644
index 0000000..5f5bf1f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A sharded file where the file names are simply provided. */
+public class ExplicitShardedFile implements ShardedFile {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class);
+
+  private static final int MAX_READ_RETRIES = 4;
+  private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+          .withMaxRetries(MAX_READ_RETRIES);
+
+  private final Collection<String> files;
+
+  /** Constructs an {@link ExplicitShardedFile} for the given files. */
+  public ExplicitShardedFile(Collection<String> files) {
+    this.files = files;
+  }
+
+  @Override
+  public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException {
+    if (files.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    IOChannelFactory factory = IOChannelUtils.getFactory(Iterables.get(files, 0));
+    IOException lastException = null;
+
+    do {
+      try {
+        // Read data from file paths
+        return readLines(files, factory);
+      } catch (IOException e) {
+        // Ignore and retry
+        lastException = e;
+        LOG.warn("Error in file reading. Ignore and retry.");
+      }
+    } while (BackOffUtils.next(sleeper, backOff));
+    // Failed after max retries
+    throw new IOException(
+        String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
+        lastException);
+  }
+
+  /**
+   * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
+   *
+   * <p>Because of eventual consistency, reads may discover no files or fewer files than the shard
+   * template implies. In this case, the read is considered to have failed.
+   */
+  public List<String> readFilesWithRetries() throws IOException, InterruptedException {
+    return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files));
+  }
+
+  /**
+   * Reads all the lines of all the files.
+   *
+   * <p>Not suitable for use except in testing of small data, since the data size may be far more
+   * than can be reasonably processed serially, in-memory, by a single thread.
+   */
+  @VisibleForTesting
+  List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
+    List<String> allLines = Lists.newArrayList();
+    int i = 1;
+    for (String file : files) {
+      try (Reader reader = Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+        List<String> lines = CharStreams.readLines(reader);
+        allLines.addAll(lines);
+        LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
+      }
+      i++;
+    }
+    return allLines;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
new file mode 100644
index 0000000..f9f2d6d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for working with sharded files. For internal use only; many parameters
+ * are just hardcoded to allow existing uses to work OK.
+ */
+public class NumberedShardedFile implements ShardedFile {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class);
+
+  static final int MAX_READ_RETRIES = 4;
+  static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+          .withMaxRetries(MAX_READ_RETRIES);
+
+  private static final Pattern DEFAULT_SHARD_TEMPLATE =
+      Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
+
+  private final String filePath;
+  private final Pattern shardTemplate;
+
+  /**
+   * Constructor that uses default shard template.
+   *
+   * @param filePath path or glob of files to include
+   */
+  public NumberedShardedFile(String filePath) {
+    this(filePath, DEFAULT_SHARD_TEMPLATE);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param filePath path or glob of files to include
+   * @param shardTemplate template of shard name to parse out the total number of shards
+   *                      which is used in I/O retry to avoid inconsistency of filesystem.
+   *                      Customized template should assign name "numshards" to capturing
+   *                      group - total shard number.
+   */
+  public NumberedShardedFile(String filePath, Pattern shardTemplate) {
+    checkArgument(
+        !Strings.isNullOrEmpty(filePath),
+        "Expected valid file path, but received %s", filePath);
+    checkNotNull(
+        shardTemplate,
+        "Expected non-null shard pattern. "
+            + "Please call the other constructor to use default pattern: %s",
+        DEFAULT_SHARD_TEMPLATE);
+
+    this.filePath = filePath;
+    this.shardTemplate = shardTemplate;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  /**
+   * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
+   *
+   * <p>Because of eventual consistency, reads may discover no files or fewer files than
+   * the shard template implies. In this case, the read is considered to have failed.
+   */
+  @Override
+  public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException {
+    IOChannelFactory factory = IOChannelUtils.getFactory(filePath);
+    IOException lastException = null;
+
+    do {
+      try {
+        // Match inputPath which may contains glob
+        Collection<String> files = factory.match(filePath);
+        LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath);
+
+        if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
+          continue;
+        }
+
+        // Read data from file paths
+        return readLines(files, factory);
+      } catch (IOException e) {
+        // Ignore and retry
+        lastException = e;
+        LOG.warn("Error in file reading. Ignore and retry.");
+      }
+    } while(BackOffUtils.next(sleeper, backOff));
+    // Failed after max retries
+    throw new IOException(
+        String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
+        lastException);
+  }
+
+  /**
+   * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
+   *
+   * <p>Because of eventual consistency, reads may discover no files or fewer files than
+   * the shard template implies. In this case, the read is considered to have failed.
+   */
+  public List<String> readFilesWithRetries()
+      throws IOException, InterruptedException {
+    return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s with shard template '%s'", filePath, shardTemplate);
+  }
+
+  /**
+   * Reads all the lines of all the files.
+   *
+   * <p>Not suitable for use except in testing of small data, since the data size may be far more
+   * than can be reasonably processed serially, in-memory, by a single thread.
+   */
+  @VisibleForTesting
+  List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException {
+    List<String> allLines = Lists.newArrayList();
+    int i = 1;
+    for (String file : files) {
+      try (Reader reader =
+               Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) {
+        List<String> lines = CharStreams.readLines(reader);
+        allLines.addAll(lines);
+        LOG.debug(
+            "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
+      }
+      i++;
+    }
+    return allLines;
+  }
+
+  /**
+   * Check if total number of files is correct by comparing with the number that
+   * is parsed from shard name using a name template. If no template is specified,
+   * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total
+   * number of files.
+   *
+   * @return {@code true} if at least one shard name matches template and total number
+   * of given files equals the number that is parsed from shard name.
+   */
+  @VisibleForTesting
+  boolean checkTotalNumOfFiles(Collection<String> files) {
+    for (String filePath : files) {
+      Path fileName = Paths.get(filePath).getFileName();
+      if (fileName == null) {
+        // this path has zero elements
+        continue;
+      }
+      Matcher matcher = shardTemplate.matcher(fileName.toString());
+      if (!matcher.matches()) {
+        // shard name doesn't match the pattern, check with the next shard
+        continue;
+      }
+      // once match, extract total number of shards and compare to file list
+      return files.size() == Integer.parseInt(matcher.group("numshards"));
+    }
+    return false;
+  }
+
+  private String computeHash(@Nonnull List<String> strs) {
+    if (strs.isEmpty()) {
+      return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
+    }
+
+    List<HashCode> hashCodes = new ArrayList<>();
+    for (String str : strs) {
+      hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
+    }
+    return Hashing.combineUnordered(hashCodes).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
new file mode 100644
index 0000000..ec9ed64
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+
+/**
+ * Bare-bones class for using sharded files.
+ *
+ * <p>For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be
+ * shipped as a {@link SerializableMatcher}.
+ */
+public interface ShardedFile extends Serializable {
+
+  /**
+   * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link
+   * BackOff}.
+   */
+  List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index 0dc307d..5438479 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -19,10 +19,6 @@ package org.apache.beam.sdk.testing;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
 
 import com.google.api.client.util.BackOff;
 import com.google.common.io.Files;
@@ -30,9 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.regex.Pattern;
-
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -77,13 +71,6 @@ public class FileChecksumMatcherTest {
   }
 
   @Test
-  public void testPreconditionFilePathIsNull() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("Expected valid file path, but received"));
-    new FileChecksumMatcher("checksumString", null);
-  }
-
-  @Test
   public void testPreconditionFilePathIsEmpty() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("Expected valid file path, but received"));
@@ -158,68 +145,4 @@ public class FileChecksumMatcherTest {
 
     assertThat(pResult, matcher);
   }
-
-  @Test
-  public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
-    File tmpFile = tmpFolder.newFile();
-    Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
-
-    FileChecksumMatcher matcher = new FileChecksumMatcher(
-        "mock-checksum",
-        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
-        Pattern.compile("incorrect-template"));
-
-    thrown.expect(IOException.class);
-    thrown.expectMessage(
-        containsString(
-            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
-    matcher.readFilesWithRetries(fastClock, backOff);
-  }
-
-  @Test
-  public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
-    File tmpFile = tmpFolder.newFile();
-    Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
-
-    FileChecksumMatcher matcher =
-        spy(new FileChecksumMatcher(
-            "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")));
-    doThrow(IOException.class)
-        .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class));
-
-    thrown.expect(IOException.class);
-    thrown.expectMessage(
-        containsString(
-            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
-    matcher.readFilesWithRetries(fastClock, backOff);
-  }
-
-  @Test
-  public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
-    FileChecksumMatcher matcher =
-        new FileChecksumMatcher(
-            "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
-
-    thrown.expect(IOException.class);
-    thrown.expectMessage(
-        containsString(
-            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
-    matcher.readFilesWithRetries(fastClock, backOff);
-  }
-
-  @Test
-  public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
-    tmpFolder.newFile("result-000-of-001");
-    tmpFolder.newFile("tmp-result-000-of-001");
-
-    FileChecksumMatcher matcher =
-        new FileChecksumMatcher(
-            "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
-
-    thrown.expect(IOException.class);
-    thrown.expectMessage(
-        containsString(
-            "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES));
-    matcher.readFilesWithRetries(fastClock, backOff);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
new file mode 100644
index 0000000..475e459
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import com.google.api.client.util.BackOff;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+/** Tests for {@link NumberedShardedFile}. */
+@RunWith(JUnit4.class)
+public class NumberedShardedFileTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+  @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+
+  private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff();
+
+  @Test
+  public void testPreconditionFilePathIsNull() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString("Expected valid file path, but received"));
+    new NumberedShardedFile(null);
+  }
+
+  @Test
+  public void testPreconditionFilePathIsEmpty() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString("Expected valid file path, but received"));
+    new NumberedShardedFile("");
+  }
+
+  @Test
+  public void testReadMultipleShards() throws Exception {
+    String
+        contents1 = "To be or not to be, ",
+        contents2 = "it is not a question.",
+        contents3 = "should not be included";
+
+    File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+    File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+    File tmpFile3 = tmpFolder.newFile("tmp");
+    Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+    Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+    Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
+
+    NumberedShardedFile shardedFile =
+        new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*"));
+
+    assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
+  }
+
+  @Test
+  public void testReadEmpty() throws Exception {
+    File emptyFile = tmpFolder.newFile("result-000-of-001");
+    Files.write("", emptyFile, StandardCharsets.UTF_8);
+    NumberedShardedFile shardedFile =
+        new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+    assertThat(shardedFile.readFilesWithRetries(), empty());
+  }
+
+  @Test
+  public void testReadCustomTemplate() throws Exception {
+    String contents1 = "To be or not to be, ", contents2 = "it is not a question.";
+
+    // Customized template: resultSSS-totalNNN
+    File tmpFile1 = tmpFolder.newFile("result0-total2");
+    File tmpFile2 = tmpFolder.newFile("result1-total2");
+    Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+    Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+
+    Pattern customizedTemplate =
+        Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
+    NumberedShardedFile shardedFile =
+        new NumberedShardedFile(
+            IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), customizedTemplate);
+
+    assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+    NumberedShardedFile shardedFile =
+        new NumberedShardedFile(
+            IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
+            Pattern.compile("incorrect-template"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+    NumberedShardedFile shardedFile =
+        spy(new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")));
+    doThrow(IOException.class)
+        .when(shardedFile)
+        .readLines(anyCollection(), any(IOChannelFactory.class));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
+    NumberedShardedFile shardedFile =
+        new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
+    tmpFolder.newFile("result-000-of-001");
+    tmpFolder.newFile("tmp-result-000-of-001");
+
+    NumberedShardedFile shardedFile =
+        new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(
+        containsString(
+            "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+}


[2/4] incubator-beam git commit: Add IntervalWindow coder to the standard registry

Posted by ke...@apache.org.
Add IntervalWindow coder to the standard registry


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9678b1cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9678b1cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9678b1cc

Branch: refs/heads/master
Commit: 9678b1cc6c799767e48ebc4d9071db099b4d135d
Parents: 59f1fb2
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 20:44:45 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 12 15:12:05 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9678b1cc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 9110de0..65f4209 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -107,6 +108,7 @@ public class CoderRegistry implements CoderProvider {
     registerCoder(TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class);
     registerCoder(Void.class, VoidCoder.class);
     registerCoder(byte[].class, ByteArrayCoder.class);
+    registerCoder(IntervalWindow.class, IntervalWindow.getCoder());
   }
 
   /**


[4/4] incubator-beam git commit: This closes #1536

Posted by ke...@apache.org.
This closes #1536


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9657ffc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9657ffc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9657ffc

Branch: refs/heads/master
Commit: d9657ffc37490b063835672e0b5287b4d18aba96
Parents: 59f1fb2 42595dc
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Dec 12 15:33:29 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 12 15:33:29 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java | 177 ++++++++-------
 .../examples/common/WriteWindowedFilesDoFn.java |  77 +++++++
 .../beam/examples/WindowedWordCountIT.java      | 182 ++++++++++++---
 .../apache/beam/sdk/coders/CoderRegistry.java   |   2 +
 .../beam/sdk/testing/FileChecksumMatcher.java   | 114 ++--------
 .../beam/sdk/util/ExplicitShardedFile.java      | 120 ++++++++++
 .../beam/sdk/util/NumberedShardedFile.java      | 220 +++++++++++++++++++
 .../org/apache/beam/sdk/util/ShardedFile.java   |  42 ++++
 .../sdk/testing/FileChecksumMatcherTest.java    |  77 -------
 .../beam/sdk/util/NumberedShardedFileTest.java  | 181 +++++++++++++++
 10 files changed, 909 insertions(+), 283 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Revise WindowedWordCount for runner and execution mode portability

Posted by ke...@apache.org.
Revise WindowedWordCount for runner and execution mode portability


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42595dcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42595dcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42595dcd

Branch: refs/heads/master
Commit: 42595dcd29c248bd3572596c9bb8464d18acd19b
Parents: db41940
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 14:37:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 12 15:23:38 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java | 177 +++++++++---------
 .../examples/common/WriteWindowedFilesDoFn.java |  77 ++++++++
 .../beam/examples/WindowedWordCountIT.java      | 182 ++++++++++++++++---
 3 files changed, 326 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 4e254bd..5c19454 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -17,26 +17,25 @@
  */
 package org.apache.beam.examples;
 
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExampleOptions;
-import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +62,8 @@ import org.joda.time.Instant;
  *   2. Adding timestamps to data
  *   3. Windowing
  *   4. Re-using PTransforms over windowed PCollections
- *   5. Writing to BigQuery
+ *   5. Accessing the window of an element
+ *   6. Writing data to per-window text files
  * </pre>
  *
  * <p>By default, the examples will run with the {@code DirectRunner}.
@@ -74,25 +74,23 @@ import org.joda.time.Instant;
  * </pre>
  * See examples/java/README.md for instructions about how to configure different runners.
  *
- * <p>Optionally specify the input file path via:
- * {@code --inputFile=gs://INPUT_PATH},
- * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}.
+ * <p>To execute this pipeline locally, specify a local output file (if using the
+ * {@code DirectRunner}) or output prefix on a supported distributed file system.
+ * <pre>{@code
+ *   --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+ * }</pre>
  *
- * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
- * specify the table, one will be created for you using the job name. If you don't specify the
- * dataset, a dataset called {@code beam_examples} must already exist in your project.
- * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
  *
  * <p>By default, the pipeline will do fixed windowing, on 1-minute windows.  You can
  * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
  * for 10-minute windows.
  *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
+ * <p>The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C).
  */
 public class WindowedWordCount {
-    static final int WINDOW_SIZE = 1;  // Default window duration in minutes
-
+    static final int WINDOW_SIZE = 10;  // Default window duration in minutes
   /**
    * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
    * this example, for the bounded data case.
@@ -102,18 +100,22 @@ public class WindowedWordCount {
    * 2-hour period.
    */
   static class AddTimestampFn extends DoFn<String, String> {
-    private static final Duration RAND_RANGE = Duration.standardHours(2);
+    private static final Duration RAND_RANGE = Duration.standardHours(1);
     private final Instant minTimestamp;
+    private final Instant maxTimestamp;
 
-    AddTimestampFn() {
-      this.minTimestamp = new Instant(System.currentTimeMillis());
+    AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
+      this.minTimestamp = minTimestamp;
+      this.maxTimestamp = maxTimestamp;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      // Generate a timestamp that falls somewhere in the past two hours.
-      long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
-      Instant randomTimestamp = minTimestamp.plus(randMillis);
+      Instant randomTimestamp =
+          new Instant(
+              ThreadLocalRandom.current()
+                  .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
+
       /**
        * Concept #2: Set the data element with that timestamp.
        */
@@ -121,50 +123,29 @@ public class WindowedWordCount {
     }
   }
 
-  /** A DoFn that converts a Word and Count into a BigQuery table row. */
-  static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      TableRow row = new TableRow()
-          .set("word", c.element().getKey())
-          .set("count", c.element().getValue())
-          // include a field for the window timestamp
-         .set("window_timestamp", c.timestamp().toString());
-      c.output(row);
+  /** A {@link DefaultValueFactory} that returns the current system time. */
+  public static class DefaultToCurrentSystemTime implements DefaultValueFactory<Long> {
+    @Override
+    public Long create(PipelineOptions options) {
+      return System.currentTimeMillis();
     }
   }
 
-  /**
-   * Helper method that defines the BigQuery schema used for the output.
-   */
-  private static TableSchema getSchema() {
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("word").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
-    TableSchema schema = new TableSchema().setFields(fields);
-    return schema;
-  }
-
-  /**
-   * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one
-   * that supports both bounded and unbounded data. This is a helper method that creates a
-   * TableReference from input options, to tell the pipeline where to write its BigQuery results.
-   */
-  private static TableReference getTableReference(Options options) {
-    TableReference tableRef = new TableReference();
-    tableRef.setProjectId(options.getProject());
-    tableRef.setDatasetId(options.getBigQueryDataset());
-    tableRef.setTableId(options.getBigQueryTable());
-    return tableRef;
+  /** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */
+  public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
+    @Override
+    public Long create(PipelineOptions options) {
+      return options.as(Options.class).getMinTimestampMillis()
+          + Duration.standardHours(1).getMillis();
+    }
   }
 
   /**
-   * Options supported by {@link WindowedWordCount}.
+   * Options for {@link WindowedWordCount}.
    *
-   * <p>Inherits standard example configuration options, which allow specification of the BigQuery
-   * table, as well as the {@link WordCount.WordCountOptions} support for
-   * specification of the input file.
+   * <p>Inherits standard example configuration options, which allow specification of the
+   * runner, as well as the {@link WordCount.WordCountOptions} support for
+   * specification of the input and output files.
    */
   public interface Options extends WordCount.WordCountOptions,
       ExampleOptions, ExampleBigQueryTableOptions {
@@ -172,14 +153,24 @@ public class WindowedWordCount {
     @Default.Integer(WINDOW_SIZE)
     Integer getWindowSize();
     void setWindowSize(Integer value);
+
+    @Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch")
+    @Default.InstanceFactory(DefaultToCurrentSystemTime.class)
+    Long getMinTimestampMillis();
+    void setMinTimestampMillis(Long value);
+
+    @Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch")
+    @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
+    Long getMaxTimestampMillis();
+    void setMaxTimestampMillis(Long value);
   }
 
   public static void main(String[] args) throws IOException {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    options.setBigQuerySchema(getSchema());
-    // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline.
-    ExampleUtils exampleUtils = new ExampleUtils(options);
-    exampleUtils.setup();
+    final String output = options.getOutput();
+    final Duration windowSize = Duration.standardMinutes(options.getWindowSize());
+    final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
+    final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
 
     Pipeline pipeline = Pipeline.create(options);
 
@@ -192,7 +183,7 @@ public class WindowedWordCount {
       .apply(TextIO.Read.from(options.getInputFile()))
       // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
       // See AddTimestampFn for more detail on this.
-      .apply(ParDo.of(new AddTimestampFn()));
+      .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
 
     /**
      * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
@@ -200,9 +191,10 @@ public class WindowedWordCount {
      * information on how fixed windows work, and for information on the other types of windowing
      * available (e.g., sliding windows).
      */
-    PCollection<String> windowedWords = input
-      .apply(Window.<String>into(
-        FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
+    PCollection<String> windowedWords =
+        input.apply(
+            Window.<String>into(
+                FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
 
     /**
      * Concept #4: Re-use our existing CountWords transform that does not have knowledge of
@@ -211,19 +203,40 @@ public class WindowedWordCount {
     PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
 
     /**
-     * Concept #5: Format the results for a BigQuery table, then write to BigQuery.
-     * The BigQuery output source supports both bounded and unbounded data.
+     * Concept #5: Customize the output format using windowing information
+     *
+     * <p>At this point, the data is organized by window. We're writing text files and and have no
+     * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get
+     * one output file per window. (if we had late data this key would not be unique)
+     *
+     * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will
+     * be automatically detected and populated with the window for the current element.
      */
-    wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
-        .apply(BigQueryIO.Write
-          .to(getTableReference(options))
-          .withSchema(getSchema())
-          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
+    PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
+        wordCounts.apply(
+            ParDo.of(
+                new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context, IntervalWindow window) {
+                    context.output(KV.of(window, context.element()));
+                  }
+                }));
 
-    PipelineResult result = pipeline.run();
+    /**
+     * Concept #6: Format the results and write to a sharded file partitioned by window, using a
+     * simple ParDo operation. Because there may be failures followed by retries, the
+     * writes must be idempotent, but the details of writing to files is elided here.
+     */
+    keyedByWindow
+        .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
+        .apply(ParDo.of(new WriteWindowedFilesDoFn(output)));
 
-    // ExampleUtils will try to cancel the pipeline before the program exists.
-    exampleUtils.waitToFinish(result);
+    PipelineResult result = pipeline.run();
+    try {
+      result.waitUntilFinish();
+    } catch (Exception exc) {
+      result.cancel();
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
new file mode 100644
index 0000000..cd6baad
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+/**
+ * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
+ * and upper bounds of their key (an {@link IntervalWindow}).
+ *
+ * <p>This is test utility code, not for end-users, so examples can be focused
+ * on their primary lessons.
+ */
+public class WriteWindowedFilesDoFn
+    extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {
+
+  static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
+  static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
+
+  private final String output;
+
+  public WriteWindowedFilesDoFn(String output) {
+    this.output = output;
+  }
+
+  @VisibleForTesting
+  public static String fileForWindow(String output, IntervalWindow window) {
+    return String.format(
+        "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext context) throws Exception {
+    // Build a file name from the window
+    IntervalWindow window = context.element().getKey();
+    String outputShard = fileForWindow(output, window);
+
+    // Open the file and write all the values
+    IOChannelFactory factory = IOChannelUtils.getFactory(outputShard);
+    OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain"));
+    for (KV<String, Long> wordCount : context.element().getValue()) {
+      STRING_CODER.encode(
+          wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER);
+      out.write(NEWLINE);
+    }
+    out.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 5d77dd5..e4570ac 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -17,37 +17,59 @@
  */
 package org.apache.beam.examples;
 
-import java.io.IOException;
+import static org.hamcrest.Matchers.equalTo;
+
+import com.google.api.client.util.Sleeper;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.testing.BigqueryMatcher;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.SerializableMatcher;
 import org.apache.beam.sdk.testing.StreamingIT;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.ExplicitShardedFile;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.ShardedFile;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/**
- * End-to-end integration test of {@link WindowedWordCount}.
- */
+/** End-to-end integration test of {@link WindowedWordCount}. */
 @RunWith(JUnit4.class)
 public class WindowedWordCountIT {
 
   private static final String DEFAULT_INPUT =
       "gs://apache-beam-samples/shakespeare/winterstale-personae";
-  private static final String DEFAULT_OUTPUT_CHECKSUM = "cd5b52939257e12428a9fa085c32a84dd209b180";
+  static final int MAX_READ_RETRIES = 4;
+  static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+          .withMaxRetries(MAX_READ_RETRIES);
 
-  /**
-   * Options for the {@link WindowedWordCount} Integration Test.
-   */
+  /** Options for the {@link WindowedWordCount} Integration Test. */
   public interface WindowedWordCountITOptions
-      extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions {
-  }
+      extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions {}
 
   @BeforeClass
   public static void setUp() {
@@ -55,36 +77,140 @@ public class WindowedWordCountIT {
   }
 
   @Test
-  public void testWindowedWordCountInBatch() throws IOException {
-    testWindowedWordCountPipeline(false /* isStreaming */);
+  public void testWindowedWordCountInBatch() throws Exception {
+    testWindowedWordCountPipeline(defaultOptions());
   }
 
   @Test
   @Category(StreamingIT.class)
-  public void testWindowedWordCountInStreaming() throws IOException {
-    testWindowedWordCountPipeline(true /* isStreaming */);
+  public void testWindowedWordCountInStreaming() throws Exception {
+    testWindowedWordCountPipeline(streamingOptions());
   }
 
-  private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException {
+  private WindowedWordCountITOptions defaultOptions() throws Exception {
     WindowedWordCountITOptions options =
         TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
-    options.setStreaming(isStreaming);
     options.setInputFile(DEFAULT_INPUT);
+    options.setTestTimeoutSeconds(1200L);
+
+    options.setMinTimestampMillis(0L);
+    options.setMinTimestampMillis(Duration.standardHours(1).getMillis());
+    options.setWindowSize(10);
+
+    options.setOutput(
+        IOChannelUtils.resolve(
+            options.getTempRoot(),
+            String.format("WindowedWordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
+            "output",
+            "results"));
+    return options;
+  }
+
+  private WindowedWordCountITOptions streamingOptions() throws Exception {
+    WindowedWordCountITOptions options = defaultOptions();
+    options.setStreaming(true);
+    return options;
+  }
+
+  private WindowedWordCountITOptions batchOptions() throws Exception {
+    WindowedWordCountITOptions options = defaultOptions();
+    // This is the default value, but make it explicit
+    options.setStreaming(false);
+    return options;
+  }
+
+  private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception {
+
+    String outputPrefix = options.getOutput();
+
+    List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+    for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
+      Instant windowStart =
+          new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
+      expectedOutputFiles.add(
+          WriteWindowedFilesDoFn.fileForWindow(
+              outputPrefix,
+              new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10)))));
+    }
 
-    // Note: currently unused because the example writes to BigQuery, but WindowedWordCount.Options
-    // are tightly coupled to WordCount.Options, where the option is required.
-    options.setOutput(IOChannelUtils.resolve(
-        options.getTempRoot(),
-        String.format("WindowedWordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
-        "output",
-        "results"));
+    ShardedFile inputFile =
+        new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
+
+    // For this integration test, input is tiny and we can build the expected counts
+    SortedMap<String, Long> expectedWordCounts = new TreeMap<>();
+    for (String line :
+        inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
+      String[] words = line.split("[^a-zA-Z']+");
+
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          expectedWordCounts.put(word,
+              MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L);
+        }
+      }
+    }
 
-    String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word",
-        options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable());
     options.setOnSuccessMatcher(
-        new BigqueryMatcher(
-            options.getAppName(), options.getProject(), query, DEFAULT_OUTPUT_CHECKSUM));
+        new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles)));
 
     WindowedWordCount.main(TestPipeline.convertToArgs(options));
   }
+
+  /**
+   * A matcher that bakes in expected word counts, so they can be read directly via some other
+   * mechanism, and compares a sharded output file with the result.
+   */
+  private static class WordCountsMatcher extends TypeSafeMatcher<PipelineResult>
+      implements SerializableMatcher<PipelineResult> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
+
+    private final SortedMap<String, Long> expectedWordCounts;
+    private final ShardedFile outputFile;
+    private SortedMap<String, Long> actualCounts;
+
+    public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, ShardedFile outputFile) {
+      this.expectedWordCounts = expectedWordCounts;
+      this.outputFile = outputFile;
+    }
+
+    @Override
+    public boolean matchesSafely(PipelineResult pipelineResult) {
+      try {
+        // Load output data
+        List<String> lines =
+            outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+
+        // Since the windowing is nondeterministic we only check the sums
+        actualCounts = new TreeMap<>();
+        for (String line : lines) {
+          String[] splits = line.split(": ");
+          String word = splits[0];
+          long count = Long.parseLong(splits[1]);
+
+          Long current = actualCounts.get(word);
+          if (current == null) {
+            actualCounts.put(word, count);
+          } else {
+            actualCounts.put(word, current + count);
+          }
+        }
+
+        return actualCounts.equals(expectedWordCounts);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Failed to read from sharded output: %s", outputFile));
+      }
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      equalTo(expectedWordCounts).describeTo(description);
+    }
+
+    @Override
+    public void describeMismatchSafely(PipelineResult pResult, Description description) {
+      equalTo(expectedWordCounts).describeMismatch(actualCounts, description);
+    }
+  }
 }