You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/13 20:34:34 UTC

[1/2] beam git commit: Unbundle Context and WindowedContext.

Repository: beam
Updated Branches:
  refs/heads/master 889776fca -> 5f972e8b2


Unbundle Context and WindowedContext.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64997efa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64997efa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64997efa

Branch: refs/heads/master
Commit: 64997efa597a6fd74f4a6b6a7ab48d663c56845f
Parents: 91c7d3d
Author: Reuven Lax <re...@google.com>
Authored: Mon Jul 10 21:30:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 09:29:23 2017 -0700

----------------------------------------------------------------------
 .../examples/common/WriteOneFilePerWindow.java  |  19 +-
 .../complete/game/utils/WriteToText.java        |  18 +-
 .../construction/WriteFilesTranslationTest.java |  12 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      |  47 ++--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 198 ++++----------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 263 ++++++++++---------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  88 +++----
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 122 ++++-----
 8 files changed, 358 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 49865ba..abd14b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -28,7 +28,9 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.joda.time.format.DateTimeFormatter;
@@ -88,14 +90,18 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
-      IntervalWindow window = (IntervalWindow) context.getWindow();
+    public ResourceId windowedFilename(int shardNumber,
+                                       int numShards,
+                                       BoundedWindow window,
+                                       PaneInfo paneInfo,
+                                       OutputFileHints outputFileHints) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
       String filename =
           String.format(
               "%s-%s-of-%s%s",
-              filenamePrefixForWindow(window),
-              context.getShardNumber(),
-              context.getNumShards(),
+              filenamePrefixForWindow(intervalWindow),
+              shardNumber,
+              numShards,
               outputFileHints.getSuggestedFilenameSuffix());
       return baseFilename
           .getCurrentDirectory()
@@ -103,7 +109,8 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index 1d60198..6b7c928 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.joda.time.DateTimeZone;
@@ -143,20 +144,25 @@ public class WriteToText<InputT>
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
-      IntervalWindow window = (IntervalWindow) context.getWindow();
+    public ResourceId windowedFilename(int shardNumber,
+                                       int numShards,
+                                       BoundedWindow window,
+                                       PaneInfo paneInfo,
+                                       OutputFileHints outputFileHints) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
       String filename =
           String.format(
               "%s-%s-of-%s%s",
-              filenamePrefixForWindow(window),
-              context.getShardNumber(),
-              context.getNumShards(),
+              filenamePrefixForWindow(intervalWindow),
+              shardNumber,
+              numShards,
               outputFileHints.getSuggestedFilenameSuffix());
       return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 283df16..4259ac8 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.junit.Test;
@@ -163,13 +165,19 @@ public class WriteFilesTranslationTest {
 
   private static class DummyFilenamePolicy extends FilenamePolicy {
     @Override
-    public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) {
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Should never be called.");
     }
 
     @Nullable
     @Override
-    public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Should never be called.");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 7a60e49..64d7edc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -52,19 +52,19 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
  * with the number of shards, index of the particular file, current window and pane information,
  * using {@link #constructName}.
  *
- * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced
- * uses in generating different files for each window and other sharding controls, see the
- * {@code WriteOneFilePerWindow} example pipeline.
+ * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced uses in generating
+ * different files for each window and other sharding controls, see the {@code
+ * WriteOneFilePerWindow} example pipeline.
  */
 public final class DefaultFilenamePolicy extends FilenamePolicy {
   /** The default sharding name template. */
   public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
-  /** The default windowed sharding name template used when writing windowed files.
-   *  This is used as default in cases when user did not specify shard template to
-   *  be used and there is a need to write windowed files. In cases when user does
-   *  specify shard template to be used then provided template will be used for both
-   *  windowed and non-windowed file names.
+  /**
+   * The default windowed sharding name template used when writing windowed files. This is used as
+   * default in cases when user did not specify shard template to be used and there is a need to
+   * write windowed files. In cases when user does specify shard template to be used then provided
+   * template will be used for both windowed and non-windowed file names.
    */
   private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
       "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
@@ -190,11 +190,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
    * <p>This is a shortcut for:
    *
    * <pre>{@code
-   *   DefaultFilenamePolicy.fromParams(new Params()
-   *     .withBaseFilename(baseFilename)
-   *     .withShardTemplate(shardTemplate)
-   *     .withSuffix(filenameSuffix)
-   *     .withWindowedWrites())
+   * DefaultFilenamePolicy.fromParams(new Params()
+   *   .withBaseFilename(baseFilename)
+   *   .withShardTemplate(shardTemplate)
+   *   .withSuffix(filenameSuffix)
+   *   .withWindowedWrites())
    * }</pre>
    *
    * <p>Where the respective {@code with} methods are invoked only if the value is non-null or true.
@@ -284,28 +284,33 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
 
   @Override
   @Nullable
-  public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+  public ResourceId unwindowedFilename(
+      int shardNumber, int numShards, OutputFileHints outputFileHints) {
     return constructName(
         params.baseFilename.get(),
         params.shardTemplate,
         params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
-        context.getShardNumber(),
-        context.getNumShards(),
+        shardNumber,
+        numShards,
         null,
         null);
   }
 
   @Override
-  public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
-    final PaneInfo paneInfo = context.getPaneInfo();
+  public ResourceId windowedFilename(
+      int shardNumber,
+      int numShards,
+      BoundedWindow window,
+      PaneInfo paneInfo,
+      OutputFileHints outputFileHints) {
     String paneStr = paneInfoToString(paneInfo);
-    String windowStr = windowToString(context.getWindow());
+    String windowStr = windowToString(window);
     return constructName(
         params.baseFilename.get(),
         params.shardTemplate,
         params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
-        context.getShardNumber(),
-        context.getNumShards(),
+        shardNumber,
+        numShards,
         paneStr,
         windowStr);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 583af60..c68b794 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -58,8 +58,6 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
@@ -96,9 +94,9 @@ import org.slf4j.LoggerFactory;
  * <p>The process of writing to file-based sink is as follows:
  *
  * <ol>
- *   <li>An optional subclass-defined initialization,
- *   <li>a parallel write of bundles to temporary files, and finally,
- *   <li>these temporary files are renamed with final output filenames.
+ * <li>An optional subclass-defined initialization,
+ * <li>a parallel write of bundles to temporary files, and finally,
+ * <li>these temporary files are renamed with final output filenames.
  * </ol>
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
@@ -125,46 +123,36 @@ import org.slf4j.LoggerFactory;
 public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
-  /**
-   * Directly supported file output compression types.
-   */
+  /** Directly supported file output compression types. */
   public enum CompressionType implements WritableByteChannelFactory {
-    /**
-     * No compression, or any other transformation, will be used.
-     */
+    /** No compression, or any other transformation, will be used. */
     UNCOMPRESSED("", null) {
       @Override
       public WritableByteChannel create(WritableByteChannel channel) throws IOException {
         return channel;
       }
     },
-    /**
-     * Provides GZip output transformation.
-     */
+    /** Provides GZip output transformation. */
     GZIP(".gz", MimeTypes.BINARY) {
       @Override
       public WritableByteChannel create(WritableByteChannel channel) throws IOException {
         return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
       }
     },
-    /**
-     * Provides BZip2 output transformation.
-     */
+    /** Provides BZip2 output transformation. */
     BZIP2(".bz2", MimeTypes.BINARY) {
       @Override
       public WritableByteChannel create(WritableByteChannel channel) throws IOException {
-        return Channels
-            .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
+        return Channels.newChannel(
+            new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
       }
     },
-    /**
-     * Provides deflate output transformation.
-     */
+    /** Provides deflate output transformation. */
     DEFLATE(".deflate", MimeTypes.BINARY) {
       @Override
       public WritableByteChannel create(WritableByteChannel channel) throws IOException {
-        return Channels
-            .newChannel(new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
+        return Channels.newChannel(
+            new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
       }
     };
 
@@ -182,7 +170,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     @Override
-    @Nullable public String getMimeType() {
+    @Nullable
+    public String getMimeType() {
       return mimeType;
     }
   }
@@ -213,8 +202,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
 
   /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
-   * underlying channel. The default is to not compress the output using
-   * {@link CompressionType#UNCOMPRESSED}.
+   * underlying channel. The default is to not compress the output using {@link
+   * CompressionType#UNCOMPRESSED}.
    */
   private final WritableByteChannelFactory writableByteChannelFactory;
 
@@ -285,85 +274,20 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
   @Experimental(Kind.FILESYSTEM)
   public abstract static class FilenamePolicy implements Serializable {
     /**
-     * Context used for generating a name based on shard number, and num shards.
-     * The policy must produce unique filenames for unique {@link Context} objects.
-     *
-     * <p>Be careful about adding fields to this as existing strategies will not notice the new
-     * fields, and may not produce unique filenames.
-     */
-    public static class Context {
-      private int shardNumber;
-      private int numShards;
-
-
-      public Context(int shardNumber, int numShards) {
-        this.shardNumber = shardNumber;
-        this.numShards = numShards;
-      }
-
-      public int getShardNumber() {
-        return shardNumber;
-      }
-
-
-      public int getNumShards() {
-        return numShards;
-      }
-    }
-
-    /**
-     * Context used for generating a name based on window, pane, shard number, and num shards.
-     * The policy must produce unique filenames for unique {@link WindowedContext} objects.
-     *
-     * <p>Be careful about adding fields to this as existing strategies will not notice the new
-     * fields, and may not produce unique filenames.
-     */
-    public static class WindowedContext {
-      private int shardNumber;
-      private int numShards;
-      private BoundedWindow window;
-      private PaneInfo paneInfo;
-
-      public WindowedContext(
-          BoundedWindow window,
-          PaneInfo paneInfo,
-          int shardNumber,
-          int numShards) {
-        this.window = window;
-        this.paneInfo = paneInfo;
-        this.shardNumber = shardNumber;
-        this.numShards = numShards;
-      }
-
-      public BoundedWindow getWindow() {
-        return window;
-      }
-
-      public PaneInfo getPaneInfo() {
-        return paneInfo;
-      }
-
-      public int getShardNumber() {
-        return shardNumber;
-      }
-
-      public int getNumShards() {
-        return numShards;
-      }
-    }
-
-    /**
      * When a sink has requested windowed or triggered output, this method will be invoked to return
      * the file {@link ResourceId resource} to be created given the base output directory and a
      * {@link OutputFileHints} containing information about the file, including a suggested
      * extension (e.g. coming from {@link CompressionType}).
      *
-     * <p>The {@link WindowedContext} object gives access to the window and pane, as well as
-     * sharding information. The policy must return unique and consistent filenames for different
-     * windows and panes.
+     * <p>The policy must return unique and consistent filenames for different windows and panes.
      */
     @Experimental(Kind.FILESYSTEM)
-    public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints);
+    public abstract ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        OutputFileHints outputFileHints);
 
     /**
      * When a sink has not requested windowed or triggered output, this method will be invoked to
@@ -371,18 +295,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
      * coming from {@link CompressionType}).
      *
-     * <p>The {@link Context} object only provides sharding information, which is used by the policy
-     * to generate unique and consistent filenames.
+     * <p>The shardNumber and numShards parameters, should be used by the policy to generate unique
+     * and consistent filenames.
      */
     @Experimental(Kind.FILESYSTEM)
     @Nullable
-    public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints);
+    public abstract ResourceId unwindowedFilename(
+        int shardNumber, int numShards, OutputFileHints outputFileHints);
 
-    /**
-     * Populates the display data.
-     */
-    public void populateDisplayData(DisplayData.Builder builder) {
-    }
+    /** Populates the display data. */
+    public void populateDisplayData(DisplayData.Builder builder) {}
   }
 
   /** The directory to which files will be written. */
@@ -449,11 +371,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
    * written,
    *
    * <ol>
-   *   <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
-   *       output bundles.
-   *   <li>During finalize, these temporary files are copied to final output locations and named
-   *       according to a file naming template.
-   *   <li>Finally, any temporary files that were created during the write are removed.
+   * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+   *     output bundles.
+   * <li>During finalize, these temporary files are copied to final output locations and named
+   *     according to a file naming template.
+   * <li>Finally, any temporary files that were created during the write are removed.
    * </ol>
    *
    * <p>Subclass implementations of WriteOperation must implement {@link
@@ -558,9 +480,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      */
     public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
 
-    /**
-     * Indicates that the operation will be performing windowed writes.
-     */
+    /** Indicates that the operation will be performing windowed writes. */
     public void setWindowedWrites(boolean windowedWrites) {
       this.windowedWrites = windowedWrites;
     }
@@ -659,9 +579,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       }
 
       int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
-      checkState(numDistinctShards == outputFilenames.size(),
-         "Only generated %s distinct file names for %s files.",
-         numDistinctShards, outputFilenames.size());
+      checkState(
+          numDistinctShards == outputFilenames.size(),
+          "Only generated %s distinct file names for %s files.",
+          numDistinctShards,
+          outputFilenames.size());
 
       return outputFilenames;
     }
@@ -726,8 +648,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       // ignore the exception for now to avoid failing the pipeline.
       if (shouldRemoveTemporaryDirectory) {
         try {
-          MatchResult singleMatch = Iterables.getOnlyElement(
-              FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
+          MatchResult singleMatch =
+              Iterables.getOnlyElement(
+                  FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
           for (Metadata matchResult : singleMatch.metadata()) {
             matches.add(matchResult.resourceId());
           }
@@ -807,18 +730,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
 
-    /**
-     * The channel to write to.
-     */
+    /** The channel to write to. */
     private WritableByteChannel channel;
 
     /**
      * The MIME type used in the creation of the output channel (if the file system supports it).
      *
-     * <p>This is the default for the sink, but it may be overridden by a supplied
-     * {@link WritableByteChannelFactory}. For example, {@link TextIO.Write} uses
-     * {@link MimeTypes#TEXT} by default but if {@link CompressionType#BZIP2} is set then
-     * the MIME type will be overridden to {@link MimeTypes#BINARY}.
+     * <p>This is the default for the sink, but it may be overridden by a supplied {@link
+     * WritableByteChannelFactory}. For example, {@link TextIO.Write} uses {@link MimeTypes#TEXT} by
+     * default but if {@link CompressionType#BZIP2} is set then the MIME type will be overridden to
+     * {@link MimeTypes#BINARY}.
      */
     private final String mimeType;
 
@@ -843,14 +764,12 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      */
     protected void writeHeader() throws Exception {}
 
-    /**
-     * Writes footer at the end of output files. Nothing by default; subclasses may override.
-     */
+    /** Writes footer at the end of output files. Nothing by default; subclasses may override. */
     protected void writeFooter() throws Exception {}
 
     /**
-     * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}.
-     * If any resources opened in the write processes need to be flushed, flush them here.
+     * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}. If
+     * any resources opened in the write processes need to be flushed, flush them here.
      */
     protected void finishWrite() throws Exception {}
 
@@ -875,9 +794,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       open(uId, window, paneInfo, shard, destination);
     }
 
-    /**
-     * Called for each value in the bundle.
-     */
+    /** Called for each value in the bundle. */
     public abstract void write(OutputT value) throws Exception;
 
     /**
@@ -982,7 +899,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
 
       checkState(
           channel.isOpen(),
-          "Channel %s to %s should only be closed by its owner: %s", channel, outputFile);
+          "Channel %s to %s should only be closed by its owner: %s",
+          channel,
+          outputFile);
 
       LOG.debug("Closing channel to {}.", outputFile);
       try {
@@ -1063,10 +982,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
       if (getWindow() != null) {
         return policy.windowedFilename(
-            new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards),
-            outputFileHints);
+            getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints);
       } else {
-        return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints);
+        return policy.unwindowedFilename(getShard(), numShards, outputFileHints);
       }
     }
 
@@ -1154,7 +1072,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      *
      * @see MimeTypes
      * @see <a href=
-     *      'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
+     *     'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
      */
     @Nullable
     String getMimeType();

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 260e47a..4a1386c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -68,8 +68,10 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -84,20 +86,15 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for AvroIO Read and Write transforms.
- */
+/** Tests for AvroIO Read and Write transforms. */
 @RunWith(JUnit4.class)
 public class AvroIOTest {
 
-  @Rule
-  public TestPipeline p = TestPipeline.create();
+  @Rule public TestPipeline p = TestPipeline.create();
 
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
+  @Rule public ExpectedException expectedException = ExpectedException.none();
 
   @Test
   public void testAvroIOGetName() {
@@ -109,11 +106,14 @@ public class AvroIOTest {
   static class GenericClass {
     int intField;
     String stringField;
+
     public GenericClass() {}
+
     public GenericClass(int intValue, String stringValue) {
       this.intField = intValue;
       this.stringField = stringValue;
     }
+
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
@@ -121,10 +121,12 @@ public class AvroIOTest {
           .add("stringField", stringField)
           .toString();
     }
+
     @Override
     public int hashCode() {
       return Objects.hash(intField, stringField);
     }
+
     @Override
     public boolean equals(Object other) {
       if (other == null || !(other instanceof GenericClass)) {
@@ -138,20 +140,16 @@ public class AvroIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadASingleFile() throws Throwable {
-    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
-        new GenericClass(5, "bar"));
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-     .apply(AvroIO.write(GenericClass.class)
-         .to(outputFile.getAbsolutePath())
-         .withoutSharding());
+        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     p.run();
 
     PCollection<GenericClass> input =
-        p.apply(
-            AvroIO.read(GenericClass.class)
-                .from(outputFile.getAbsolutePath()));
+        p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
@@ -161,25 +159,25 @@ public class AvroIOTest {
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
   public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
-    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
-        new GenericClass(5, "bar"));
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-     .apply(AvroIO.write(GenericClass.class)
-         .to(outputFile.getAbsolutePath())
-         .withoutSharding()
-         .withCodec(CodecFactory.deflateCodec(9)));
+        .apply(
+            AvroIO.write(GenericClass.class)
+                .to(outputFile.getAbsolutePath())
+                .withoutSharding()
+                .withCodec(CodecFactory.deflateCodec(9)));
     p.run();
 
-    PCollection<GenericClass> input = p
-        .apply(AvroIO.read(GenericClass.class)
-            .from(outputFile.getAbsolutePath()));
+    PCollection<GenericClass> input =
+        p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
-    DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
-        new GenericDatumReader());
+    DataFileStream dataFileStream =
+        new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
     assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
   }
 
@@ -187,25 +185,25 @@ public class AvroIOTest {
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
   public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
-    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
-        new GenericClass(5, "bar"));
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.write(GenericClass.class)
-          .to(outputFile.getAbsolutePath())
-          .withoutSharding()
-          .withCodec(CodecFactory.nullCodec()));
+        .apply(
+            AvroIO.write(GenericClass.class)
+                .to(outputFile.getAbsolutePath())
+                .withoutSharding()
+                .withCodec(CodecFactory.nullCodec()));
     p.run();
 
-    PCollection<GenericClass> input = p
-        .apply(AvroIO.read(GenericClass.class)
-            .from(outputFile.getAbsolutePath()));
+    PCollection<GenericClass> input =
+        p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
-    DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
-        new GenericDatumReader());
+    DataFileStream dataFileStream =
+        new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
     assertEquals("null", dataFileStream.getMetaString("avro.codec"));
   }
 
@@ -214,12 +212,15 @@ public class AvroIOTest {
     int intField;
     String stringField;
     @Nullable String nullableField;
+
     public GenericClassV2() {}
+
     public GenericClassV2(int intValue, String stringValue, String nullableValue) {
       this.intField = intValue;
       this.stringField = stringValue;
       this.nullableField = nullableValue;
     }
+
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
@@ -228,10 +229,12 @@ public class AvroIOTest {
           .add("nullableField", nullableField)
           .toString();
     }
+
     @Override
     public int hashCode() {
       return Objects.hash(intField, stringField, nullableField);
     }
+
     @Override
     public boolean equals(Object other) {
       if (other == null || !(other instanceof GenericClassV2)) {
@@ -245,32 +248,28 @@ public class AvroIOTest {
   }
 
   /**
-   * Tests that {@code AvroIO} can read an upgraded version of an old class, as long as the
-   * schema resolution process succeeds. This test covers the case when a new, {@code @Nullable}
-   * field has been added.
+   * Tests that {@code AvroIO} can read an upgraded version of an old class, as long as the schema
+   * resolution process succeeds. This test covers the case when a new, {@code @Nullable} field has
+   * been added.
    *
    * <p>For more information, see http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
    */
   @Test
   @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
-    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
-        new GenericClass(5, "bar"));
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.write(GenericClass.class)
-          .to(outputFile.getAbsolutePath())
-          .withoutSharding());
+        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     p.run();
 
-    List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
-        new GenericClassV2(5, "bar", null));
+    List<GenericClassV2> expected =
+        ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null));
 
     PCollection<GenericClassV2> input =
-        p.apply(
-            AvroIO.read(GenericClassV2.class)
-                .from(outputFile.getAbsolutePath()));
+        p.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(expected);
     p.run();
@@ -284,7 +283,12 @@ public class AvroIOTest {
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) {
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        OutputFileHints outputFileHints) {
       String filenamePrefix =
           outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
 
@@ -292,11 +296,11 @@ public class AvroIOTest {
           String.format(
               "%s-%s-%s-of-%s-pane-%s%s%s",
               filenamePrefix,
-              input.getWindow(),
-              input.getShardNumber(),
-              input.getNumShards() - 1,
-              input.getPaneInfo().getIndex(),
-              input.getPaneInfo().isLast() ? "-final" : "",
+              window,
+              shardNumber,
+              numShards - 1,
+              paneInfo.getIndex(),
+              paneInfo.isLast() ? "-final" : "",
               outputFileHints.getSuggestedFilenameSuffix());
       return outputFilePrefix
           .getCurrentDirectory()
@@ -304,7 +308,8 @@ public class AvroIOTest {
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Expecting windowed outputs only");
     }
 
@@ -316,8 +321,7 @@ public class AvroIOTest {
     }
   }
 
-  @Rule
-  public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+  @Rule public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
 
   @Test
   @Category({ValidatesRunner.class, UsesTestStream.class})
@@ -328,27 +332,31 @@ public class AvroIOTest {
     Instant base = new Instant(0);
     ArrayList<GenericClass> allElements = new ArrayList<>();
     ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>();
-    ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList(
-        base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
-        base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
+    ArrayList<Instant> firstWindowTimestamps =
+        Lists.newArrayList(
+            base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
+            base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
 
     Random random = new Random();
     for (int i = 0; i < 100; ++i) {
       GenericClass item = new GenericClass(i, String.valueOf(i));
       allElements.add(item);
-      firstWindowElements.add(TimestampedValue.of(item,
-          firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+      firstWindowElements.add(
+          TimestampedValue.of(
+              item, firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
     }
 
     ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>();
-    ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList(
-        base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
-        base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
+    ArrayList<Instant> secondWindowTimestamps =
+        Lists.newArrayList(
+            base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
+            base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
     for (int i = 100; i < 200; ++i) {
       GenericClass item = new GenericClass(i, String.valueOf(i));
       allElements.add(new GenericClass(i, String.valueOf(i)));
-      secondWindowElements.add(TimestampedValue.of(item,
-          secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+      secondWindowElements.add(
+          TimestampedValue.of(
+              item, secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
     }
 
     TimestampedValue<GenericClass>[] firstWindowArray =
@@ -356,14 +364,17 @@ public class AvroIOTest {
     TimestampedValue<GenericClass>[] secondWindowArray =
         secondWindowElements.toArray(new TimestampedValue[100]);
 
-    TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class))
-        .advanceWatermarkTo(new Instant(0))
-        .addElements(firstWindowArray[0],
-            Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
-        .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
-        .addElements(secondWindowArray[0],
-        Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
-        .advanceWatermarkToInfinity();
+    TestStream<GenericClass> values =
+        TestStream.create(AvroCoder.of(GenericClass.class))
+            .advanceWatermarkTo(new Instant(0))
+            .addElements(
+                firstWindowArray[0],
+                Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
+            .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
+            .addElements(
+                secondWindowArray[0],
+                Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
+            .advanceWatermarkToInfinity();
 
     FilenamePolicy policy =
         new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename));
@@ -384,11 +395,17 @@ public class AvroIOTest {
     for (int shard = 0; shard < 2; shard++) {
       for (int window = 0; window < 2; window++) {
         Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window));
-        IntervalWindow intervalWindow = new IntervalWindow(
-            windowStart, Duration.standardMinutes(1));
+        IntervalWindow intervalWindow =
+            new IntervalWindow(windowStart, Duration.standardMinutes(1));
         expectedFiles.add(
-            new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard
-                + "-of-1" + "-pane-0-final"));
+            new File(
+                baseFilename
+                    + "-"
+                    + intervalWindow.toString()
+                    + "-"
+                    + shard
+                    + "-of-1"
+                    + "-pane-0-final"));
       }
     }
 
@@ -396,9 +413,10 @@ public class AvroIOTest {
     for (File outputFile : expectedFiles) {
       assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
       try (DataFileReader<GenericClass> reader =
-               new DataFileReader<>(outputFile,
-                   new ReflectDatumReader<GenericClass>(
-                       ReflectData.get().getSchema(GenericClass.class)))) {
+          new DataFileReader<>(
+              outputFile,
+              new ReflectDatumReader<GenericClass>(
+                  ReflectData.get().getSchema(GenericClass.class)))) {
         Iterators.addAll(actualElements, reader);
       }
       outputFile.delete();
@@ -408,25 +426,22 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
-    AvroIO.Write<String> write = AvroIO.write(String.class)
-        .to("/tmp/foo/baz");
+    AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz");
     assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write<String> write = AvroIO.write(String.class)
-        .to("/tmp/foo/baz")
-        .withCodec(CodecFactory.snappyCodec());
+    AvroIO.Write<String> write =
+        AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
   }
 
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
-    AvroIO.Write<String> write = AvroIO.write(String.class)
-        .to("/tmp/foo/baz")
-        .withCodec(CodecFactory.deflateCodec(9));
+    AvroIO.Write<String> write =
+        AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9));
 
     assertEquals(
         CodecFactory.deflateCodec(9).toString(),
@@ -436,9 +451,8 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomXZCodec() throws Exception {
-    AvroIO.Write<String> write = AvroIO.write(String.class)
-        .to("/tmp/foo/baz")
-        .withCodec(CodecFactory.xzCodec(9));
+    AvroIO.Write<String> write =
+        AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9));
 
     assertEquals(
         CodecFactory.xzCodec(9).toString(),
@@ -449,28 +463,32 @@ public class AvroIOTest {
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
   public void testMetadata() throws Exception {
-    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
-        new GenericClass(5, "bar"));
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.write(GenericClass.class)
-            .to(outputFile.getAbsolutePath())
-            .withoutSharding()
-            .withMetadata(ImmutableMap.<String, Object>of(
-                "stringKey", "stringValue",
-                "longKey", 100L,
-                "bytesKey", "bytesValue".getBytes())));
+        .apply(
+            AvroIO.write(GenericClass.class)
+                .to(outputFile.getAbsolutePath())
+                .withoutSharding()
+                .withMetadata(
+                    ImmutableMap.<String, Object>of(
+                        "stringKey",
+                        "stringValue",
+                        "longKey",
+                        100L,
+                        "bytesKey",
+                        "bytesValue".getBytes())));
     p.run();
 
-    DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
-        new GenericDatumReader());
+    DataFileStream dataFileStream =
+        new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
     assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
     assertEquals(100L, dataFileStream.getMetaLong("longKey"));
     assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey"));
   }
 
-
   @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
   private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
@@ -488,8 +506,8 @@ public class AvroIOTest {
     p.run();
 
     String shardNameTemplate =
-        firstNonNull(write.getShardTemplate(),
-            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+        firstNonNull(
+            write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
   }
@@ -517,8 +535,8 @@ public class AvroIOTest {
     for (File outputFile : expectedFiles) {
       assertTrue("Expected output file " + outputFile.getName(), outputFile.exists());
       try (DataFileReader<String> reader =
-          new DataFileReader<>(outputFile,
-              new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
+          new DataFileReader<>(
+              outputFile, new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
         Iterators.addAll(actualElements, reader);
       }
     }
@@ -560,18 +578,21 @@ public class AvroIOTest {
         AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("AvroIO.Read should include the file pattern in its primitive transform",
-        displayData, hasItem(hasDisplayItem("filePattern")));
+    assertThat(
+        "AvroIO.Read should include the file pattern in its primitive transform",
+        displayData,
+        hasItem(hasDisplayItem("filePattern")));
   }
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class)
-        .to("/foo")
-        .withShardNameTemplate("-SS-of-NN-")
-        .withSuffix("bar")
-        .withNumShards(100)
-        .withCodec(CodecFactory.snappyCodec());
+    AvroIO.Write<GenericClass> write =
+        AvroIO.write(GenericClass.class)
+            .to("/foo")
+            .withShardNameTemplate("-SS-of-NN-")
+            .withSuffix("bar")
+            .withNumShards(100)
+            .withCodec(CodecFactory.snappyCodec());
 
     DisplayData displayData = DisplayData.from(write);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 755bb59..b756778 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -48,7 +48,6 @@ import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.Writer;
@@ -62,9 +61,7 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for {@link FileBasedSink}.
- */
+/** Tests for {@link FileBasedSink}. */
 @RunWith(JUnit4.class)
 public class FileBasedSinkTest {
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -87,14 +84,14 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Writer opens the correct file, writes the header, footer, and elements in the correct
-   * order, and returns the correct filename.
+   * Writer opens the correct file, writes the header, footer, and elements in the correct order,
+   * and returns the correct filename.
    */
   @Test
   public void testWriter() throws Exception {
     String testUid = "testId";
-    ResourceId expectedTempFile = getBaseTempDirectory()
-        .resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
+    ResourceId expectedTempFile =
+        getBaseTempDirectory().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
     List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
     List<String> expected = new ArrayList<>();
     expected.add(SimpleSink.SimpleWriter.HEADER);
@@ -114,9 +111,7 @@ public class FileBasedSinkTest {
     assertFileContains(expected, expectedTempFile);
   }
 
-  /**
-   * Assert that a file contains the lines provided, in the same order as expected.
-   */
+  /** Assert that a file contains the lines provided, in the same order as expected. */
   private void assertFileContains(List<String> expected, ResourceId file) throws Exception {
     try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) {
       List<String> actual = new ArrayList<>();
@@ -140,9 +135,7 @@ public class FileBasedSinkTest {
     }
   }
 
-  /**
-   * Removes temporary files when temporary and output directories differ.
-   */
+  /** Removes temporary files when temporary and output directories differ. */
   @Test
   public void testRemoveWithTempFilename() throws Exception {
     testRemoveTemporaryFiles(3, getBaseTempDirectory());
@@ -218,7 +211,7 @@ public class FileBasedSinkTest {
               .getSink()
               .getDynamicDestinations()
               .getFilenamePolicy(null)
-              .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED);
+              .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED);
       assertTrue(new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
@@ -232,8 +225,7 @@ public class FileBasedSinkTest {
    * Create n temporary and output files and verify that removeTemporaryFiles only removes temporary
    * files.
    */
-  private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
-      throws Exception {
+  private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception {
     String prefix = "file";
     SimpleSink<Void> sink =
         SimpleSink.makeSimpleSink(
@@ -245,8 +237,7 @@ public class FileBasedSinkTest {
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      ResourceId tempResource =
-          WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
+      ResourceId tempResource = WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
       File tmpFile = new File(tempResource.toString());
       tmpFile.getParentFile().mkdirs();
       assertTrue("not able to create new temp file", tmpFile.createNewFile());
@@ -264,12 +255,9 @@ public class FileBasedSinkTest {
     for (int i = 0; i < numFiles; i++) {
       File temporaryFile = temporaryFiles.get(i);
       assertThat(
-          String.format("temp file %s exists", temporaryFile),
-          temporaryFile.exists(), is(false));
+          String.format("temp file %s exists", temporaryFile), temporaryFile.exists(), is(false));
       File outputFile = outputFiles.get(i);
-      assertThat(
-          String.format("output file %s exists", outputFile),
-          outputFile.exists(), is(true));
+      assertThat(String.format("output file %s exists", outputFile), outputFile.exists(), is(true));
     }
   }
 
@@ -279,8 +267,8 @@ public class FileBasedSinkTest {
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
     List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
     List<String> inputContents = Arrays.asList("1", "2", "3");
-    List<String> expectedOutputFilenames = Arrays.asList(
-        "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
+    List<String> expectedOutputFilenames =
+        Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
 
     Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
     List<ResourceId> expectedOutputPaths = new ArrayList<>();
@@ -301,8 +289,7 @@ public class FileBasedSinkTest {
               .getSink()
               .getDynamicDestinations()
               .getFilenamePolicy(null)
-              .unwindowedFilename(
-                  new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED));
+              .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED));
     }
 
     // Copy input files to output files.
@@ -319,16 +306,12 @@ public class FileBasedSinkTest {
       ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
     List<ResourceId> filenames = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      filenames.add(
-          policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED));
+      filenames.add(policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED));
     }
     return filenames;
   }
 
-  /**
-   * Output filenames are generated correctly when an extension is supplied.
-   */
-
+  /** Output filenames are generated correctly when an extension is supplied. */
   @Test
   public void testGenerateOutputFilenames() {
     List<ResourceId> expected;
@@ -340,17 +323,17 @@ public class FileBasedSinkTest {
             root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
     FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
 
-    expected = Arrays.asList(
-        root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
-        root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
-        root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)
-    );
+    expected =
+        Arrays.asList(
+            root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+            root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+            root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE));
     actual = generateDestinationFilenames(root, policy, 3);
     assertEquals(expected, actual);
 
-    expected = Collections.singletonList(
-        root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)
-    );
+    expected =
+        Collections.singletonList(
+            root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE));
     actual = generateDestinationFilenames(root, policy, 1);
     assertEquals(expected, actual);
 
@@ -396,17 +379,17 @@ public class FileBasedSinkTest {
             root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
     FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
 
-    expected = Arrays.asList(
-        root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
-        root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
-        root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)
-    );
+    expected =
+        Arrays.asList(
+            root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
+            root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
+            root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE));
     actual = generateDestinationFilenames(root, policy, 3);
     assertEquals(expected, actual);
 
-    expected = Collections.singletonList(
-        root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
-    );
+    expected =
+        Collections.singletonList(
+            root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE));
     actual = generateDestinationFilenames(root, policy, 1);
     assertEquals(expected, actual);
 
@@ -479,9 +462,8 @@ public class FileBasedSinkTest {
     }
   }
 
-  private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
-      String... values)
-      throws IOException {
+  private File writeValuesWithWritableByteChannelFactory(
+      final WritableByteChannelFactory factory, String... values) throws IOException {
     final File file = tmpFolder.newFile("test.gz");
     final WritableByteChannel channel =
         factory.create(Channels.newChannel(new FileOutputStream(file)));

http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 55f2a87..1ca7169 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -70,8 +70,10 @@ import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -89,17 +91,12 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for the WriteFiles PTransform.
- */
+/** Tests for the WriteFiles PTransform. */
 @RunWith(JUnit4.class)
 public class WriteFilesTest {
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule
-  public final TestPipeline p = TestPipeline.create();
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public final TestPipeline p = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @SuppressWarnings("unchecked") // covariant cast
   private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP =
@@ -114,12 +111,12 @@ public class WriteFilesTest {
 
   private static final PTransform<PCollection<String>, PCollectionView<Integer>>
       SHARDING_TRANSFORM =
-      new PTransform<PCollection<String>, PCollectionView<Integer>>() {
-        @Override
-        public PCollectionView<Integer> expand(PCollection<String> input) {
-          return null;
-        }
-      };
+          new PTransform<PCollection<String>, PCollectionView<Integer>>() {
+            @Override
+            public PCollectionView<Integer> expand(PCollection<String> input) {
+              return null;
+            }
+          };
 
   private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
     private final Window<T> window;
@@ -161,18 +158,20 @@ public class WriteFilesTest {
   }
 
   private String getBaseOutputFilename() {
-    return getBaseOutputDirectory()
-        .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
+    return getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
   }
 
-  /**
-   * Test a WriteFiles transform with a PCollection of elements.
-   */
+  /** Test a WriteFiles transform with a PCollection of elements. */
   @Test
   @Category(NeedsRunner.class)
   public void testWrite() throws IOException {
-    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
-        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+    List<String> inputs =
+        Arrays.asList(
+            "Critical canary",
+            "Apprehensive eagle",
+            "Intimidating pigeon",
+            "Pedantic gull",
+            "Frisky finch");
     runWrite(
         inputs,
         IDENTITY_MAP,
@@ -180,9 +179,7 @@ public class WriteFilesTest {
         WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
-  /**
-   * Test that WriteFiles with an empty input still produces one shard.
-   */
+  /** Test that WriteFiles with an empty input still produces one shard. */
   @Test
   @Category(NeedsRunner.class)
   public void testEmptyWrite() throws IOException {
@@ -191,8 +188,7 @@ public class WriteFilesTest {
         IDENTITY_MAP,
         getBaseOutputFilename(),
         WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
-    checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
-        Optional.of(1));
+    checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1));
   }
 
   /**
@@ -212,7 +208,6 @@ public class WriteFilesTest {
   private ResourceId getBaseOutputDirectory() {
     return LocalResources.fromFile(tmpFolder.getRoot(), true)
         .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
-
   }
 
   private SimpleSink<Void> makeSimpleSink() {
@@ -267,9 +262,7 @@ public class WriteFilesTest {
             .withNumShards(20));
   }
 
-  /**
-   * Test a WriteFiles transform with an empty PCollection.
-   */
+  /** Test a WriteFiles transform with an empty PCollection. */
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws IOException {
@@ -281,14 +274,17 @@ public class WriteFilesTest {
         WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
-  /**
-   * Test a WriteFiles with a windowed PCollection.
-   */
+  /** Test a WriteFiles with a windowed PCollection. */
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWindowed() throws IOException {
-    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
-        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+    List<String> inputs =
+        Arrays.asList(
+            "Critical canary",
+            "Apprehensive eagle",
+            "Intimidating pigeon",
+            "Pedantic gull",
+            "Frisky finch");
     runWrite(
         inputs,
         new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
@@ -296,14 +292,17 @@ public class WriteFilesTest {
         WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
-  /**
-   * Test a WriteFiles with sessions.
-   */
+  /** Test a WriteFiles with sessions. */
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithSessions() throws IOException {
-    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
-        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+    List<String> inputs =
+        Arrays.asList(
+            "Critical canary",
+            "Apprehensive eagle",
+            "Intimidating pigeon",
+            "Pedantic gull",
+            "Frisky finch");
 
     runWrite(
         inputs,
@@ -589,19 +588,24 @@ public class WriteFilesTest {
     public String filenamePrefixForWindow(IntervalWindow window) {
       String prefix =
           baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
-      return String.format("%s%s-%s",
-          prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+      return String.format(
+          "%s%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
-      IntervalWindow window = (IntervalWindow) context.getWindow();
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        OutputFileHints outputFileHints) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
       String filename =
           String.format(
               "%s-%s-of-%s%s%s",
-              filenamePrefixForWindow(window),
-              context.getShardNumber(),
-              context.getNumShards(),
+              filenamePrefixForWindow(intervalWindow),
+              shardNumber,
+              numShards,
               outputFileHints.getSuggestedFilenameSuffix(),
               suffix);
       return baseFilename
@@ -610,17 +614,14 @@ public class WriteFilesTest {
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, OutputFileHints outputFileHints) {
       String prefix =
           baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
       String filename =
           String.format(
               "%s-%s-of-%s%s%s",
-              prefix,
-              context.getShardNumber(),
-              context.getNumShards(),
-              outputFileHints.getSuggestedFilenameSuffix(),
-              suffix);
+              prefix, shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix);
       return baseFilename
           .getCurrentDirectory()
           .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -656,12 +657,14 @@ public class WriteFilesTest {
 
     Optional<Integer> numShards =
         (write.getNumShards() != null)
-            ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent();
+            ? Optional.of(write.getNumShards().get())
+            : Optional.<Integer>absent();
     checkFileContents(baseName, inputs, numShards);
   }
 
-  static void checkFileContents(String baseName, List<String> inputs,
-                                Optional<Integer> numExpectedShards) throws IOException {
+  static void checkFileContents(
+      String baseName, List<String> inputs, Optional<Integer> numExpectedShards)
+      throws IOException {
     List<File> outputFiles = Lists.newArrayList();
     final String pattern = baseName + "*";
     List<Metadata> metadata =
@@ -690,12 +693,11 @@ public class WriteFilesTest {
     assertThat(actual, containsInAnyOrder(inputs.toArray()));
   }
 
-  /**
-   * Options for test, exposed for PipelineOptionsFactory.
-   */
+  /** Options for test, exposed for PipelineOptionsFactory. */
   public interface WriteOptions extends TestPipelineOptions {
     @Description("Test flag and value")
     String getTestFlag();
+
     void setTestFlag(String value);
   }
 


[2/2] beam git commit: This closes #3539: Unbundle FileNamePolicy Context and WindowedContext

Posted by ke...@apache.org.
This closes #3539: Unbundle FileNamePolicy Context and WindowedContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f972e8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f972e8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f972e8b

Branch: refs/heads/master
Commit: 5f972e8b2525660a2c09e6f9f21a13b5b7b46366
Parents: 889776f 64997ef
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 13 12:16:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 12:16:42 2017 -0700

----------------------------------------------------------------------
 .../examples/common/WriteOneFilePerWindow.java  |  19 +-
 .../complete/game/utils/WriteToText.java        |  18 +-
 .../construction/WriteFilesTranslationTest.java |  12 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      |  47 ++--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 198 ++++----------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 263 ++++++++++---------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  88 +++----
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 122 ++++-----
 8 files changed, 358 insertions(+), 409 deletions(-)
----------------------------------------------------------------------