You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/05 17:42:57 UTC

[1/3] beam git commit: Add windowing support to FileBasedSink

Repository: beam
Updated Branches:
  refs/heads/master 8e5cfdea9 -> bc907c58b


http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 948a65b..16f3eb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.util.List;
@@ -30,7 +29,9 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,7 +43,9 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,9 +84,19 @@ import org.slf4j.LoggerFactory;
 public class Write<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(Write.class);
 
+  private static final int UNKNOWN_SHARDNUM = -1;
+  private static final int UNKNOWN_NUMSHARDS = -1;
+
   private final Sink<T> sink;
+  // This allows the number of shards to be dynamically computed based on the input
+  // PCollection.
   @Nullable
   private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+  // We don't use a side input for static sharding, as we want this value to be updatable
+  // when a pipeline is updated.
+  @Nullable
+  private final ValueProvider<Integer> numShardsProvider;
+  private boolean windowedWrites;
 
   /**
    * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
@@ -91,21 +104,24 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    */
   public static <T> Write<T> to(Sink<T> sink) {
     checkNotNull(sink, "sink");
-    return new Write<>(sink, null /* runner-determined sharding */);
+    return new Write<>(sink, null /* runner-determined sharding */, null, false);
   }
 
   private Write(
       Sink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+      @Nullable ValueProvider<Integer> numShardsProvider,
+      boolean windowedWrites) {
     this.sink = sink;
     this.computeNumShards = computeNumShards;
+    this.numShardsProvider = numShardsProvider;
+    this.windowedWrites = windowedWrites;
   }
 
   @Override
   public PDone expand(PCollection<T> input) {
-    checkArgument(
-        IsBounded.BOUNDED == input.isBounded(),
-        "%s can only be applied to a Bounded PCollection",
+    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
+        "%s can only be applied to an unbounded PCollection if doing windowed writes",
         Write.class.getSimpleName());
     PipelineOptions options = input.getPipeline().getOptions();
     sink.validate(options);
@@ -120,6 +136,11 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
         .include("sink", sink);
     if (getSharding() != null) {
       builder.include("sharding", getSharding());
+    } else if (getNumShards() != null) {
+      String numShards = getNumShards().isAccessible()
+          ? getNumShards().get().toString() : getNumShards().toString();
+      builder.add(DisplayData.item("numShards", numShards)
+          .withLabel("Fixed Number of Shards"));
     }
   }
 
@@ -141,6 +162,10 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     return computeNumShards;
   }
 
+  public ValueProvider<Integer> getNumShards() {
+    return numShardsProvider;
+  }
+
   /**
    * Returns a new {@link Write} that will write to the current {@link Sink} using the
    * specified number of shards.
@@ -165,8 +190,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
    * more information.
    */
-  public Write<T> withNumShards(ValueProvider<Integer> numShards) {
-    return new Write<>(sink, new ConstantShards<T>(numShards));
+  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+    return new Write<>(sink, null, numShardsProvider, windowedWrites);
   }
 
   /**
@@ -179,7 +204,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
   public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
     checkNotNull(
         sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new Write<>(sink, sharding);
+    return new Write<>(sink, sharding, null, windowedWrites);
   }
 
   /**
@@ -187,7 +212,25 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    * runner-determined sharding.
    */
   public Write<T> withRunnerDeterminedSharding() {
-    return new Write<>(sink, null);
+    return new Write<>(sink, null, null, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link Write} that writes preserves windowing on it's input.
+   *
+   * <p>If this option is not specified, windowing and triggering are replaced by
+   * {@link GlobalWindows} and {@link DefaultTrigger}.
+   *
+   * <p>If there is no data for a window, no output shards will be generated for that window.
+   * If a window triggers multiple times, then more than a single output shard might be
+   * generated multiple times; it's up to the sink implementation to keep these output shards
+   * unique.
+   *
+   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+   * positive value.
+   */
+  public Write<T> withWindowedWrites() {
+    return new Write<>(sink, computeNumShards, numShardsProvider, true);
   }
 
   /**
@@ -205,13 +248,19 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
       // Lazily initialize the Writer
       if (writer == null) {
         WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
         LOG.info("Opening writer for write operation {}", writeOperation);
         writer = writeOperation.createWriter(c.getPipelineOptions());
-        writer.open(UUID.randomUUID().toString());
+
+        if (windowedWrites) {
+          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
+              UNKNOWN_NUMSHARDS);
+        } else {
+          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+        }
         LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
       }
       try {
@@ -257,42 +306,57 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    */
   private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
     private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+    private final PCollectionView<Integer> numShardsView;
 
-    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
+                        PCollectionView<Integer> numShardsView) {
       this.writeOperationView = writeOperationView;
+      this.numShardsView = numShardsView;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
       // In a sharded write, single input element represents one shard. We can open and close
       // the writer in each call to processElement.
       WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
       LOG.info("Opening writer for write operation {}", writeOperation);
       Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-      writer.open(UUID.randomUUID().toString());
+      if (windowedWrites) {
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
+            numShards);
+      } else {
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+      }
       LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
 
       try {
-        for (T t : c.element().getValue()) {
-          writer.write(t);
-        }
-      } catch (Exception e) {
         try {
-          writer.close();
-        } catch (Exception closeException) {
-          if (closeException instanceof InterruptedException) {
-            // Do not silently ignore interrupted state.
-            Thread.currentThread().interrupt();
+          for (T t : c.element().getValue()) {
+            writer.write(t);
           }
-          // Do not mask the exception that caused the write to fail.
-          e.addSuppressed(closeException);
+        } catch (Exception e) {
+          try {
+            writer.close();
+          } catch (Exception closeException) {
+            if (closeException instanceof InterruptedException) {
+              // Do not silently ignore interrupted state.
+              Thread.currentThread().interrupt();
+            }
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
+          }
+          throw e;
         }
+
+        // Close the writer; if this throws let the error propagate.
+        WriteT result = writer.close();
+        c.output(result);
+      } catch (Exception e) {
+        // If anything goes wrong, make sure to delete the temporary file.
+        writer.cleanup();
         throw e;
       }
-
-      // Close the writer; if this throws let the error propagate.
-      WriteT result = writer.close();
-      c.output(result);
     }
 
     @Override
@@ -302,23 +366,32 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-    private final PCollectionView<Integer> numShards;
+    private final PCollectionView<Integer> numShardsView;
+    private final ValueProvider<Integer> numShardsProvider;
     private int shardNumber;
 
-    ApplyShardingKey(PCollectionView<Integer> numShards) {
-      this.numShards = numShards;
-      shardNumber = -1;
+    ApplyShardingKey(PCollectionView<Integer> numShardsView,
+                     ValueProvider<Integer> numShardsProvider) {
+      this.numShardsView = numShardsView;
+      this.numShardsProvider = numShardsProvider;
+      shardNumber = UNKNOWN_SHARDNUM;
     }
 
     @ProcessElement
     public void processElement(ProcessContext context) {
-      Integer shardCount = context.sideInput(numShards);
+      int shardCount = 0;
+      if (numShardsView != null) {
+        shardCount = context.sideInput(numShardsView);
+      } else {
+        checkNotNull(numShardsProvider);
+        shardCount = numShardsProvider.get();
+      }
       checkArgument(
           shardCount > 0,
           "Must have a positive number of shards specified for non-runner-determined sharding."
               + " Got %s",
           shardCount);
-      if (shardNumber == -1) {
+      if (shardNumber == UNKNOWN_SHARDNUM) {
         // We want to desynchronize the first record sharding key for each instance of
         // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
         shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
@@ -340,8 +413,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    * <p>This singleton collection containing the WriteOperation is then used as a side input to a
    * ParDo over the PCollection of elements to write. In this bundle-writing phase,
    * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and
-   * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for
+   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
    * every element in the bundle. The output of this ParDo is a PCollection of
    * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
    * each bundle.
@@ -364,6 +437,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
   private <WriteT> PDone createWrite(
       PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
     Pipeline p = input.getPipeline();
+    writeOperation.setWindowedWrites(windowedWrites);
 
     // A coder to use for the WriteOperation.
     @SuppressWarnings("unchecked")
@@ -373,7 +447,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
     // the sink.
     PCollection<WriteOperation<T, WriteT>> operationCollection =
-        p.apply(Create.of(writeOperation).withCoder(operationCoder));
+        p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
 
     // Initialize the resource in a do-once ParDo on the WriteOperation.
     operationCollection = operationCollection
@@ -384,6 +458,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
             WriteOperation<T, WriteT> writeOperation = c.element();
             LOG.info("Initializing write operation {}", writeOperation);
             writeOperation.initialize(c.getPipelineOptions());
+            writeOperation.setWindowedWrites(windowedWrites);
             LOG.debug("Done initializing write operation {}", writeOperation);
             // The WriteOperation is also the output of this ParDo, so it can have mutable
             // state.
@@ -396,133 +471,133 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
         operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
 
-    // Re-window the data into the global window and remove any existing triggers.
-    PCollection<T> inputInGlobalWindow =
-        input.apply(
-            Window.<T>into(new GlobalWindows())
-                .triggering(DefaultTrigger.of())
-                .discardingFiredPanes());
+    if (!windowedWrites) {
+      // Re-window the data into the global window and remove any existing triggers.
+      input =
+          input.apply(
+              Window.<T>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+    }
+
 
     // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
     // as a side input) and collect the results of the writes in a PCollection.
     // There is a dependency between this ParDo and the first (the WriteOperation PCollection
     // as a side input), so this will happen after the initial ParDo.
     PCollection<WriteT> results;
-    final PCollectionView<Integer> numShards;
-    if (computeNumShards == null) {
-      numShards = null;
-      results =
-          inputInGlobalWindow.apply(
-              "WriteBundles",
+    final PCollectionView<Integer> numShardsView;
+    if (computeNumShards == null && numShardsProvider == null) {
+      if (windowedWrites) {
+        throw new IllegalStateException("When doing windowed writes, numShards must be set"
+            + "explicitly to a positive value");
+      }
+      numShardsView = null;
+      results = input
+          .apply("WriteBundles",
               ParDo.of(new WriteBundles<>(writeOperationView))
                   .withSideInputs(writeOperationView));
     } else {
-      numShards = inputInGlobalWindow.apply(computeNumShards);
-      results =
-          inputInGlobalWindow
-              .apply(
-                  "ApplyShardLabel",
-                  ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
-              .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-              .apply(
-                  "WriteShardedBundles",
-                  ParDo.of(new WriteShardedBundles<>(writeOperationView))
-                      .withSideInputs(writeOperationView));
+      if (computeNumShards != null) {
+        numShardsView = input.apply(computeNumShards);
+        results  = input
+            .apply("ApplyShardLabel", ParDo.of(
+                new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
+                    .withSideInputs(numShardsView, writeOperationView));
+      } else {
+        numShardsView = null;
+        results = input
+            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
+                    .withSideInputs(writeOperationView));
+      }
     }
     results.setCoder(writeOperation.getWriterResultCoder());
 
-    final PCollectionView<Iterable<WriteT>> resultsView =
-        results.apply(View.<WriteT>asIterable());
-
-    // Finalize the write in another do-once ParDo on the singleton collection containing the
-    // Writer. The results from the per-bundle writes are given as an Iterable side input.
-    // The WriteOperation's state is the same as after its initialization in the first do-once
-    // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
-    // collection as a side input), so it will happen after the parallel write.
-    ImmutableList.Builder<PCollectionView<?>> sideInputs =
-        ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-    if (numShards != null) {
-      sideInputs.add(numShards);
-    }
-    operationCollection
-        .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) throws Exception {
-            WriteOperation<T, WriteT> writeOperation = c.element();
-            LOG.info("Finalizing write operation {}.", writeOperation);
-            List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
-            LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-            // We must always output at least 1 shard, and honor user-specified numShards if set.
-            int minShardsNeeded;
-            if (numShards == null) {
-              minShardsNeeded = 1;
-            } else {
-              minShardsNeeded = c.sideInput(numShards);
-              checkArgument(
-                  minShardsNeeded > 0,
-                  "Must have a positive number of shards for non-runner-determined sharding."
-                      + " Got %s",
-                  minShardsNeeded);
+    if (windowedWrites) {
+      // When processing streaming windowed writes, results will arrive multiple times. This
+      // means we can't share the below implementation that turns the results into a side input,
+      // as new data arriving into a side input does not trigger the listening DoFn. Instead
+      // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
+      // whenever new data arrives.
+      PCollection<KV<Void, WriteT>> keyedResults =
+          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
+      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
+          .getWriterResultCoder()));
+
+      // Is the continuation trigger sufficient?
+      keyedResults
+          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
+          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.element().getValue());
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
             }
-            int extraShardsNeeded = minShardsNeeded - results.size();
-            if (extraShardsNeeded > 0) {
-              LOG.info(
-                  "Creating {} empty output shards in addition to {} written for a total of {}.",
-                  extraShardsNeeded, results.size(), minShardsNeeded);
-              for (int i = 0; i < extraShardsNeeded; ++i) {
-                Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                writer.open(UUID.randomUUID().toString());
-                WriteT emptyWrite = writer.close();
-                results.add(emptyWrite);
+          }).withSideInputs(writeOperationView));
+    } else {
+      final PCollectionView<Iterable<WriteT>> resultsView =
+          results.apply(View.<WriteT>asIterable());
+      ImmutableList.Builder<PCollectionView<?>> sideInputs =
+          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+      if (numShardsView != null) {
+        sideInputs.add(numShardsView);
+      }
+
+      // Finalize the write in another do-once ParDo on the singleton collection containing the
+      // Writer. The results from the per-bundle writes are given as an Iterable side input.
+      // The WriteOperation's state is the same as after its initialization in the first do-once
+      // ParDo. There is a dependency between this ParDo and the parallel write (the writer
+      // results collection as a side input), so it will happen after the parallel write.
+      // For the non-windowed case, we guarantee that  if no data is written but the user has
+      // set numShards, then all shards will be written out as empty files. For this reason we
+      // use a side input here.
+      operationCollection
+          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              WriteOperation<T, WriteT> writeOperation = c.element();
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+              // We must always output at least 1 shard, and honor user-specified numShards if
+              // set.
+              int minShardsNeeded;
+              if (numShardsView != null) {
+                minShardsNeeded = c.sideInput(numShardsView);
+              } else if (numShardsProvider != null) {
+                minShardsNeeded = numShardsProvider.get();
+              } else {
+                minShardsNeeded = 1;
               }
-              LOG.debug("Done creating extra shards.");
+              int extraShardsNeeded = minShardsNeeded - results.size();
+              if (extraShardsNeeded > 0) {
+                LOG.info(
+                    "Creating {} empty output shards in addition to {} written for a total of "
+                        + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
+                for (int i = 0; i < extraShardsNeeded; ++i) {
+                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
+                      UNKNOWN_NUMSHARDS);
+                  WriteT emptyWrite = writer.close();
+                  results.add(emptyWrite);
+                }
+                LOG.debug("Done creating extra shards.");
+              }
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
             }
-
-            writeOperation.finalize(results, c.getPipelineOptions());
-            LOG.debug("Done finalizing write operation {}", writeOperation);
-          }
-        }).withSideInputs(sideInputs.build()));
-    return PDone.in(input.getPipeline());
-  }
-
-  @VisibleForTesting
-  static class ConstantShards<T>
-      extends PTransform<PCollection<T>, PCollectionView<Integer>> {
-    private final ValueProvider<Integer> numShards;
-
-    private ConstantShards(ValueProvider<Integer> numShards) {
-      this.numShards = numShards;
-    }
-
-    @Override
-    public PCollectionView<Integer> expand(PCollection<T> input) {
-      return input
-          .getPipeline()
-          .apply(Create.of(0))
-          .apply(
-              "FixedNumShards",
-              ParDo.of(
-                  new DoFn<Integer, Integer>() {
-                    @ProcessElement
-                    public void outputNumShards(ProcessContext ctxt) {
-                      checkArgument(
-                          numShards.isAccessible(),
-                          "NumShards must be accessible at runtime to use constant sharding");
-                      ctxt.output(numShards.get());
-                    }
-                  }))
-          .apply(View.<Integer>asSingleton());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(
-          DisplayData.item("Fixed Number of Shards", numShards).withLabel("ConstantShards"));
-    }
-
-    public ValueProvider<Integer> getNumShards() {
-      return numShards;
+          }).withSideInputs(sideInputs.build()));
     }
+    return PDone.in(input.getPipeline());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 6937e93..2159c8f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -176,7 +176,7 @@ public class XmlSink {
      * <p>The specified class must be able to be used to create a JAXB context.
      */
     public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
+      return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
     }
 
     /**
@@ -194,7 +194,7 @@ public class XmlSink {
      * supplied name.
      */
     public Bound<T> withRootElement(String rootElementName) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
+      return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
     }
 
     /**
@@ -205,7 +205,7 @@ public class XmlSink {
     public void validate(PipelineOptions options) {
       checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
       checkNotNull(rootElementName, "Missing a root element name.");
-      checkNotNull(baseOutputFilename, "Missing a filename to write to.");
+      checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to.");
       try {
         JAXBContext.newInstance(classToBind);
       } catch (JAXBException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 0739381..e1ad47b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.testing;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -35,10 +38,12 @@ public interface TestPipelineOptions extends PipelineOptions {
   void setTempRoot(String value);
 
   @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+  @JsonIgnore
   SerializableMatcher<PipelineResult> getOnCreateMatcher();
   void setOnCreateMatcher(SerializableMatcher<PipelineResult> value);
 
   @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+  @JsonIgnore
   SerializableMatcher<PipelineResult> getOnSuccessMatcher();
   void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index dd81a34..6f6ba37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -45,6 +46,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -175,17 +177,20 @@ public class FileIOChannelFactory implements IOChannelFactory {
   }
 
   @Override
-  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws
+      IOException {
+    List<String> srcList = Lists.newArrayList(srcFilenames);
+    List<String> destList = Lists.newArrayList(destFilenames);
     checkArgument(
-        srcFilenames.size() == destFilenames.size(),
+        srcList.size() == destList.size(),
         "Number of source files %s must equal number of destination files %s",
-        srcFilenames.size(),
-        destFilenames.size());
-    int numFiles = srcFilenames.size();
+        srcList.size(),
+        destList.size());
+    int numFiles = srcList.size();
     for (int i = 0; i < numFiles; i++) {
-      String src = srcFilenames.get(i);
-      String dst = destFilenames.get(i);
-      LOG.debug("Copying {} to {}", src, dst);
+      String src = srcList.get(i);
+      String dst = destList.get(i);
+      LOG.info("Copying {} to {}", src, dst);
       try {
         // Copy the source file, replacing the existing destination.
         // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
@@ -194,7 +199,7 @@ public class FileIOChannelFactory implements IOChannelFactory {
             new File(dst).toPath(),
             StandardCopyOption.REPLACE_EXISTING);
       } catch (NoSuchFileException e) {
-        LOG.debug("{} does not exist.", src);
+        LOG.info("{} does not exist.", src);
         // Suppress exception if file does not exist.
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index 9f99cd6..745dcb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -99,7 +99,8 @@ public class GcsIOChannelFactory implements IOChannelFactory {
   }
 
   @Override
-  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+      throws IOException {
     options.getGcsUtil().copy(srcFilenames, destFilenames);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 14781c4..1c853bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -635,23 +636,27 @@ public class GcsUtil {
     return batches;
   }
 
-  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+  public void copy(Iterable<String> srcFilenames,
+                   Iterable<String> destFilenames) throws
+      IOException {
     executeBatches(makeCopyBatches(srcFilenames, destFilenames));
   }
 
-  List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> destFilenames)
+  List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames)
       throws IOException {
+    List<String> srcList = Lists.newArrayList(srcFilenames);
+    List<String> destList = Lists.newArrayList(destFilenames);
     checkArgument(
-        srcFilenames.size() == destFilenames.size(),
+        srcList.size() == destList.size(),
         "Number of source files %s must equal number of destination files %s",
-        srcFilenames.size(),
-        destFilenames.size());
+        srcList.size(),
+        destList.size());
 
     List<BatchRequest> batches = new LinkedList<>();
     BatchRequest batch = createBatchRequest();
-    for (int i = 0; i < srcFilenames.size(); i++) {
-      final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
-      final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
+    for (int i = 0; i < srcList.size(); i++) {
+      final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
+      final GcsPath destPath = GcsPath.fromUri(destList.get(i));
       enqueueCopy(sourcePath, destPath, batch);
       if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
         batches.add(batch);

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
index 9504f45..3a3af17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
@@ -23,7 +23,6 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.util.Collection;
-import java.util.List;
 
 /**
  * Defines a factory for working with read and write channels.
@@ -116,7 +115,7 @@ public interface IOChannelFactory {
    * @param srcFilenames the source filenames.
    * @param destFilenames the destination filenames.
    */
-  void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
+  void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException;
 
   /**
    * Removes a collection of files or directories.

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 19f5ffa..f3dbb05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -31,13 +31,18 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Random;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
@@ -48,18 +53,29 @@ import org.apache.avro.reflect.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroIO.Write.Bound;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -94,7 +110,7 @@ public class AvroIOTest {
   }
 
   @Test
-  public void testWriteWithoutValidationFlag() throws Exception {
+  public void testWriteWithoutValPuidationFlag() throws Exception {
     AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write.to("gs://bucket/foo/baz");
     assertTrue(write.needsValidation());
     assertFalse(write.withoutValidation().needsValidation());
@@ -275,6 +291,132 @@ public class AvroIOTest {
     p.run();
   }
 
+  private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) {
+    return TimestampedValue.of(element, new Instant(0).plus(duration));
+  }
+
+  private static class WindowedFilenamePolicy extends FilenamePolicy {
+    String outputFilePrefix;
+
+    WindowedFilenamePolicy(String outputFilePrefix) {
+      this.outputFilePrefix = outputFilePrefix;
+    }
+
+    @Override
+    public ValueProvider<String> getBaseOutputFilenameProvider() {
+      return StaticValueProvider.of(outputFilePrefix);
+    }
+
+    @Override
+    public String windowedFilename(WindowedContext input) {
+      String filename = outputFilePrefix + "-" + input.getWindow().toString() +  "-"
+          + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-"
+          + input.getPaneInfo().getIndex();
+      if (input.getPaneInfo().isLast()) {
+        filename += "-final";
+      }
+      return filename;
+    }
+
+    @Override
+    public String unwindowedFilename(Context input) {
+      String filename = outputFilePrefix + input.getShardNumber() + "-of-"
+          + (input.getNumShards() - 1);
+      return filename;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
+          .withLabel("File Name Prefix"));
+    }
+  }
+
+  @Rule
+  public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class })
+  public void testWindowedAvroIOWrite() throws Throwable {
+    File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
+    final String outputFilePrefix = baseOutputFile.getAbsolutePath();
+
+    Instant base = new Instant(0);
+    ArrayList<GenericClass> allElements = new ArrayList<>();
+    ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>();
+    ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList(
+        base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
+        base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
+
+    Random random = new Random();
+    for (int i = 0; i < 100; ++i) {
+      GenericClass item = new GenericClass(i, String.valueOf(i));
+      allElements.add(item);
+      firstWindowElements.add(TimestampedValue.of(item,
+          firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+    }
+
+    ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>();
+    ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList(
+        base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
+        base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
+    for (int i = 100; i < 200; ++i) {
+      GenericClass item = new GenericClass(i, String.valueOf(i));
+      allElements.add(new GenericClass(i, String.valueOf(i)));
+      secondWindowElements.add(TimestampedValue.of(item,
+          secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+    }
+
+
+    TimestampedValue<GenericClass>[] firstWindowArray =
+        firstWindowElements.toArray(new TimestampedValue[100]);
+    TimestampedValue<GenericClass>[] secondWindowArray =
+        secondWindowElements.toArray(new TimestampedValue[100]);
+
+    TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class))
+        .advanceWatermarkTo(new Instant(0))
+        .addElements(firstWindowArray[0],
+            Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
+        .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
+        .addElements(secondWindowArray[0],
+        Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
+        .advanceWatermarkToInfinity();
+
+    windowedAvroWritePipeline
+        .apply(values)
+        .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
+        .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix))
+            .withWindowedWrites()
+            .withNumShards(2)
+            .withSchema(GenericClass.class));
+    windowedAvroWritePipeline.run();
+
+    // Validate that the data written matches the expected elements in the expected order
+    List<File> expectedFiles = new ArrayList<>();
+    for (int shard = 0; shard < 2; shard++) {
+      for (int window = 0; window < 2; window++) {
+        Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window));
+        IntervalWindow intervalWindow = new IntervalWindow(
+            windowStart, Duration.standardMinutes(1));
+        expectedFiles.add(
+            new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard
+                + "-of-1" + "-pane-0-final"));
+      }
+    }
+
+    List<GenericClass> actualElements = new ArrayList<>();
+    for (File outputFile : expectedFiles) {
+      assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
+      try (DataFileReader<GenericClass> reader =
+               new DataFileReader<>(outputFile, AvroCoder.of(
+                   GenericClass.class).createDatumReader())) {
+        Iterators.addAll(actualElements, reader);
+      }
+      outputFile.delete();
+    }
+    assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
+  }
+
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
     AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
@@ -347,8 +489,10 @@ public class AvroIOTest {
 
     Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
     if (numShards > 1) {
+      System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
     } else {
+      System.out.println("no sharding");
       write = write.withoutSharding();
     }
     p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index d2c1968..5b81ba8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -41,13 +41,17 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -99,7 +103,7 @@ public class FileBasedSinkTest {
     expected.addAll(values);
     expected.add(SimpleSink.SimpleWriter.FOOTER);
 
-    writer.open(testUid);
+    writer.openUnwindowed(testUid, -1, -1);
     for (String value : values) {
       writer.write(value);
     }
@@ -215,20 +219,18 @@ public class FileBasedSinkTest {
 
     int numFiles = temporaryFiles.size();
 
-    List<File> outputFiles = new ArrayList<>();
     List<FileResult> fileResults = new ArrayList<>();
-    List<String> outputFilenames = writeOp.generateDestinationFilenames(numFiles);
-
-    // Create temporary output bundles and output File objects
+    // Create temporary output bundles and output File objects.
     for (int i = 0; i < numFiles; i++) {
-      fileResults.add(new FileResult(temporaryFiles.get(i).toString()));
-      outputFiles.add(new File(outputFilenames.get(i)));
+      fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null));
     }
 
     writeOp.finalize(fileResults, options);
 
     for (int i = 0; i < numFiles; i++) {
-      assertTrue(outputFiles.get(i).exists());
+      String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename(
+          new Context(i, numFiles));
+      assertTrue(new File(outputFilename).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
 
@@ -258,7 +260,7 @@ public class FileBasedSinkTest {
       outputFiles.add(outputFile);
     }
 
-    writeOp.removeTemporaryFiles(Collections.<String>emptyList(), options);
+    writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options);
 
     for (int i = 0; i < numFiles; i++) {
       assertFalse(temporaryFiles.get(i).exists());
@@ -274,12 +276,12 @@ public class FileBasedSinkTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
 
-    List<String> inputFilenames = Arrays.asList("input-3", "input-2", "input-1");
-    List<String> inputContents = Arrays.asList("3", "2", "1");
+    List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
+    List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames = Arrays.asList(
-        "output-00002-of-00003.test", "output-00001-of-00003.test", "output-00000-of-00003.test");
+        "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test");
 
-    List<String> inputFilePaths = new ArrayList<>();
+    Map<String, String> inputFilePaths = new HashMap<>();
     List<String> expectedOutputPaths = new ArrayList<>();
 
     for (int i = 0; i < inputFilenames.size(); i++) {
@@ -291,14 +293,13 @@ public class FileBasedSinkTest {
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
       List<String> lines = Arrays.asList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.add(inputTmpFile.toString());
+      inputFilePaths.put(inputTmpFile.toString(),
+          writeOp.getSink().getFileNamePolicy().unwindowedFilename(
+              new Context(i, inputFilenames.size())));
     }
 
     // Copy input files to output files.
-    List<String> actual = writeOp.copyToOutputFiles(inputFilePaths, options);
-
-    // Assert that the expected paths are returned.
-    assertThat(expectedOutputPaths, containsInAnyOrder(actual.toArray()));
+    writeOp.copyToOutputFiles(inputFilePaths, options);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -306,6 +307,14 @@ public class FileBasedSinkTest {
     }
   }
 
+  public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) {
+    List<String> filenames = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      filenames.add(policy.unwindowedFilename(new Context(i, numFiles)));
+    }
+    return filenames;
+  }
+
   /**
    * Output filenames use the supplied naming template.
    */
@@ -314,36 +323,35 @@ public class FileBasedSinkTest {
     List<String> expected;
     List<String> actual;
     SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN");
-    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+    FilenamePolicy policy = sink.getFileNamePolicy();
 
     expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
         appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
 
     // Also validate that we handle the case where the user specified "." that we do
     // not prefix an additional "." making "..test"
     sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN");
-    writeOp = new SimpleSink.SimpleWriteOperation(sink);
     expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
         appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -355,20 +363,21 @@ public class FileBasedSinkTest {
     List<String> expected;
     List<String> actual;
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    FilenamePolicy policy = writeOp.getSink().getFileNamePolicy();
 
     expected = Arrays.asList(
         appendToTempFolder("output-00000-of-00003.test"),
         appendToTempFolder("output-00001-of-00003.test"),
         appendToTempFolder("output-00002-of-00003.test"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -380,16 +389,17 @@ public class FileBasedSinkTest {
     SimpleSink sink = new SimpleSink("output", "test", "-NN");
     SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
 
-    // A single shard doesn't need to include the shard number.
-    assertEquals(Arrays.asList("output-01.test"),
-                 writeOp.generateDestinationFilenames(1));
-
     // More than one shard does.
     try {
-      writeOp.generateDestinationFilenames(3);
+      Iterable<FileResult> results = Lists.newArrayList(
+          new FileResult("temp1", "file1"),
+          new FileResult("temp2", "file1"),
+          new FileResult("temp3", "file1"));
+
+      writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
     } catch (IllegalStateException exn) {
-      assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3 files.",
+      assertEquals("Only generated 1 distinct file names for 3 files.",
                    exn.getMessage());
     }
   }
@@ -402,19 +412,19 @@ public class FileBasedSinkTest {
     List<String> expected;
     List<String> actual;
     SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), "");
-    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+    FilenamePolicy policy = sink.getFileNamePolicy();
 
     expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"),
         appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output-00000-of-00001"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -513,7 +523,7 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.open(testUid);
+    writer.openUnwindowed(testUid, -1, -1);
     writer.write("a");
     writer.write("b");
     final FileResult result = writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 3ecbed4..16d7f2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -49,10 +49,10 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.io.Write.ConstantShards;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -63,11 +63,12 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -311,8 +312,10 @@ public class WriteTest {
     assertThat(write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
         write.getSharding();
-    assertThat(write.getSharding(), instanceOf(ConstantShards.class));
-    assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3));
+
+    assertThat(write.getSharding(), is(nullValue()));
+    assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
+    assertThat(write.getNumShards().get(), equalTo(3));
     assertThat(write.getSharding(), equalTo(originalSharding));
 
     Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
@@ -352,7 +355,7 @@ public class WriteTest {
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
-    assertThat(displayData, hasDisplayItem("Fixed Number of Shards", 1));
+    assertThat(displayData, hasDisplayItem("numShards", "1"));
   }
 
   @Test
@@ -383,17 +386,6 @@ public class WriteTest {
     assertThat(displayData, hasDisplayItem("spam", "ham"));
   }
 
-  @Test
-  public void testWriteUnbounded() {
-    PCollection<String> unbounded = p.apply(CountingInput.unbounded())
-        .apply(ToString.elements());
-
-    TestSink sink = new TestSink();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Write can only be applied to a Bounded PCollection");
-    unbounded.apply(Write.to(sink));
-  }
-
   /**
    * Performs a Write transform and verifies the Write transform calls the appropriate methods on
    * a test sink in the correct order, as well as verifies that the elements of a PCollection are
@@ -535,6 +527,10 @@ public class WriteTest {
     }
 
     @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+    }
+
+    @Override
     public void finalize(Iterable<TestWriterResult> bundleResults, PipelineOptions options)
         throws Exception {
       assertEquals("test_value", options.as(WriteOptions.class).getTestFlag());
@@ -633,7 +629,21 @@ public class WriteTest {
     }
 
     @Override
-    public void open(String uId) throws Exception {
+    public final void openWindowed(String uId,
+                                   BoundedWindow window,
+                                   PaneInfo paneInfo,
+                                   int shard,
+                                   int nShards) throws Exception {
+      numShards.incrementAndGet();
+      this.uId = uId;
+      assertEquals(State.INITIAL, state);
+      state = State.OPENED;
+    }
+
+    @Override
+    public final void openUnwindowed(String uId,
+                                     int shard,
+                                     int nShards) throws Exception {
       numShards.incrementAndGet();
       this.uId = uId;
       assertEquals(State.INITIAL, state);
@@ -653,8 +663,13 @@ public class WriteTest {
       state = State.CLOSED;
       return new TestWriterResult(uId, elementsWritten);
     }
+
+    @Override
+    public void cleanup() throws Exception {
+    }
   }
 
+
   /**
    * Options for test, exposed for PipelineOptionsFactory.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 96b8c57..63b5d11 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -93,7 +93,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement);
     assertEquals(testClass, sink.classToBind);
     assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.baseOutputFilename.get());
+    assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
   }
 
   /**
@@ -105,7 +105,7 @@ public class XmlSinkTest {
         XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
     assertEquals(testClass, sink.classToBind);
     assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.baseOutputFilename.get());
+    assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
   }
 
   /**
@@ -142,9 +142,9 @@ public class XmlSinkTest {
         XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
     XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
     assertEquals(testClass, writeOp.getSink().classToBind);
-    assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get());
+    assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get());
     assertEquals(testRootElement, writeOp.getSink().rootElementName);
-    assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
+   // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension);
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 084d303..2ddead7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -162,23 +162,6 @@ public class TestPipelineTest implements Serializable {
     }
 
     @Test
-    public void testMatcherSerializationDeserialization() {
-      TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class);
-      SerializableMatcher<PipelineResult> m1 = new TestMatcher();
-      SerializableMatcher<PipelineResult> m2 = new TestMatcher();
-
-      opts.setOnCreateMatcher(m1);
-      opts.setOnSuccessMatcher(m2);
-
-      String[] arr = TestPipeline.convertToArgs(opts);
-      TestPipelineOptions newOpts =
-          PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class);
-
-      assertEquals(m1, newOpts.getOnCreateMatcher());
-      assertEquals(m2, newOpts.getOnSuccessMatcher());
-    }
-
-    @Test
     public void testRunWithDummyEnvironmentVariableFails() {
       System.getProperties()
           .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true));

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index 9b085ca..10ff788 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -284,6 +286,10 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     }
 
     @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+    }
+
+    @Override
     public void finalize(final Iterable<String> writerResults, PipelineOptions options)
         throws Exception {
       UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
@@ -298,7 +304,6 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     private void doFinalize(Iterable<String> writerResults) throws Exception {
       Job job = sink.newJob();
       FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
-
       // If there are 0 output shards, just create output folder.
       if (!writerResults.iterator().hasNext()) {
         fs.mkdirs(new Path(path));
@@ -389,7 +394,17 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     }
 
     @Override
-    public void open(final String uId) throws Exception {
+    public void openWindowed(final String uId,
+                             BoundedWindow window,
+                             PaneInfo paneInfo,
+                             int shard,
+                             int numShards) throws Exception {
+      throw new UnsupportedOperationException("Windowing support not implemented yet for"
+          + "HDFS. Window " + window);
+    }
+
+    @Override
+    public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
       UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
           new PrivilegedExceptionAction<Void>() {
             @Override
@@ -427,6 +442,11 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     }
 
     @Override
+    public void cleanup() throws Exception {
+
+    }
+
+    @Override
     public String close() throws Exception {
       return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
           new PrivilegedExceptionAction<String>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
index 8b9a6d1..cedd812 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -66,7 +66,7 @@ public class HDFSFileSinkTest {
     Sink.WriteOperation<T, String> writeOperation =
         (Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
     Sink.Writer<T, String> writer = writeOperation.createWriter(options);
-    writer.open(UUID.randomUUID().toString());
+    writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
     for (T t: toWrite) {
       writer.write(t);
     }


[2/3] beam git commit: Add windowing support to FileBasedSink

Posted by ke...@apache.org.
Add windowing support to FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6addc95f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6addc95f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6addc95f

Branch: refs/heads/master
Commit: 6addc95f0300a2e03109d4ad7ee93727d0a3b7b2
Parents: 570d0e2
Author: Reuven Lax <re...@google.com>
Authored: Thu Mar 9 09:45:35 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 5 08:57:21 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |   1 -
 .../apache/beam/examples/WindowedWordCount.java |  34 +-
 .../examples/common/WriteOneFilePerWindow.java  |  91 ++++
 .../examples/common/WriteWindowedFilesDoFn.java |  77 ----
 .../beam/examples/WindowedWordCountIT.java      |  41 +-
 .../core/construction/PTransformMatchers.java   |   3 +-
 .../direct/WriteWithShardingFactory.java        |   6 +-
 .../streaming/io/UnboundedFlinkSink.java        |  20 +-
 .../beam/runners/flink/WriteSinkITCase.java     |  23 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 157 +++++--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 429 +++++++++++++++----
 .../main/java/org/apache/beam/sdk/io/Sink.java  |  55 ++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  98 ++++-
 .../main/java/org/apache/beam/sdk/io/Write.java | 377 +++++++++-------
 .../java/org/apache/beam/sdk/io/XmlSink.java    |   6 +-
 .../beam/sdk/testing/TestPipelineOptions.java   |   5 +
 .../beam/sdk/util/FileIOChannelFactory.java     |  23 +-
 .../beam/sdk/util/GcsIOChannelFactory.java      |   3 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  21 +-
 .../apache/beam/sdk/util/IOChannelFactory.java  |   3 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 146 ++++++-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  94 ++--
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  49 ++-
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |   8 +-
 .../beam/sdk/testing/TestPipelineTest.java      |  17 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   |  24 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      |   2 +-
 27 files changed, 1295 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 2b18130..021a819 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -209,7 +209,6 @@
                 <configuration>
                   <includes>
                     <include>WordCountIT.java</include>
-                    <include>WindowedWordCountIT.java</include>
                   </includes>
                   <parallel>all</parallel>
                   <threadCount>4</threadCount>

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 5c19454..d88de54 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExampleOptions;
-import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.examples.common.WriteOneFilePerWindow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
@@ -31,11 +31,9 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -203,33 +201,13 @@ public class WindowedWordCount {
     PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
 
     /**
-     * Concept #5: Customize the output format using windowing information
-     *
-     * <p>At this point, the data is organized by window. We're writing text files and and have no
-     * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get
-     * one output file per window. (if we had late data this key would not be unique)
-     *
-     * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will
-     * be automatically detected and populated with the window for the current element.
-     */
-    PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
-        wordCounts.apply(
-            ParDo.of(
-                new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext context, IntervalWindow window) {
-                    context.output(KV.of(window, context.element()));
-                  }
-                }));
-
-    /**
-     * Concept #6: Format the results and write to a sharded file partitioned by window, using a
+     * Concept #5: Format the results and write to a sharded file partitioned by window, using a
      * simple ParDo operation. Because there may be failures followed by retries, the
      * writes must be idempotent, but the details of writing to files is elided here.
      */
-    keyedByWindow
-        .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
-        .apply(ParDo.of(new WriteWindowedFilesDoFn(output)));
+    wordCounts
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+        .apply(new WriteOneFilePerWindow(output));
 
     PipelineResult result = pipeline.run();
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
new file mode 100644
index 0000000..2ed8a74
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+/**
+ * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
+ * and upper bounds of their key (an {@link IntervalWindow}).
+ *
+ * <p>This is test utility code, not for end-users, so examples can be focused on their primary
+ * lessons.
+ */
+public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
+
+  private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
+  private String filenamePrefix;
+
+  public WriteOneFilePerWindow(String filenamePrefix) {
+    this.filenamePrefix = filenamePrefix;
+  }
+
+  @Override
+  public PDone expand(PCollection<String> input) {
+    return input.apply(
+        TextIO.Write.to(new PerWindowFiles(filenamePrefix)).withWindowedWrites().withNumShards(3));
+  }
+
+  /**
+   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
+   * being written. This always includes the shard number and the total number of shards. For
+   * windowed writes, it also includes the window and pane index (a sequence number assigned to each
+   * trigger firing).
+   */
+  public static class PerWindowFiles extends FilenamePolicy {
+
+    private final String output;
+
+    public PerWindowFiles(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public ValueProvider<String> getBaseOutputFilenameProvider() {
+      return StaticValueProvider.of(output);
+    }
+
+    public String   filenamePrefixForWindow(IntervalWindow window) {
+      return String.format(
+          "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
+    }
+
+    @Override
+    public String windowedFilename(WindowedContext context) {
+      IntervalWindow window = (IntervalWindow) context.getWindow();
+      return String.format(
+          "%s-%s-of-%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
+    }
+
+    @Override
+    public String unwindowedFilename(Context context) {
+      throw new UnsupportedOperationException("Unsupported.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
deleted file mode 100644
index cd6baad..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.charset.StandardCharsets;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.KV;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-/**
- * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
- * and upper bounds of their key (an {@link IntervalWindow}).
- *
- * <p>This is test utility code, not for end-users, so examples can be focused
- * on their primary lessons.
- */
-public class WriteWindowedFilesDoFn
-    extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {
-
-  static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-  static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
-
-  private final String output;
-
-  public WriteWindowedFilesDoFn(String output) {
-    this.output = output;
-  }
-
-  @VisibleForTesting
-  public static String fileForWindow(String output, IntervalWindow window) {
-    return String.format(
-        "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext context) throws Exception {
-    // Build a file name from the window
-    IntervalWindow window = context.element().getKey();
-    String outputShard = fileForWindow(output, window);
-
-    // Open the file and write all the values
-    IOChannelFactory factory = IOChannelUtils.getFactory(outputShard);
-    OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain"));
-    for (KV<String, Long> wordCount : context.element().getValue()) {
-      STRING_CODER.encode(
-          wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER);
-      out.write(NEWLINE);
-    }
-    out.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 703f836..857f1d3 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -23,13 +23,14 @@ import com.google.api.client.util.Sleeper;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.ExplicitShardedFile;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.NumberedShardedFile;
 import org.apache.beam.sdk.util.ShardedFile;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
@@ -64,7 +66,7 @@ public class WindowedWordCountIT {
   @Rule public TestName testName = new TestName();
 
   private static final String DEFAULT_INPUT =
-      "gs://apache-beam-samples/shakespeare/winterstale-personae";
+      "gs://apache-beam-samples/shakespeare/sonnets.txt";
   static final int MAX_READ_RETRIES = 4;
   static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
   static final FluentBackoff BACK_OFF_FACTORY =
@@ -130,14 +132,18 @@ public class WindowedWordCountIT {
 
     String outputPrefix = options.getOutput();
 
-    List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+    PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
+
+    List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+
     for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
-      Instant windowStart =
+      final Instant windowStart =
           new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
       expectedOutputFiles.add(
-          WriteWindowedFilesDoFn.fileForWindow(
-              outputPrefix,
-              new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10)))));
+          new NumberedShardedFile(
+              filenamePolicy.filenamePrefixForWindow(
+                  new IntervalWindow(
+                      windowStart, windowStart.plus(Duration.standardMinutes(10)))) + "*"));
     }
 
     ShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
@@ -157,7 +163,7 @@ public class WindowedWordCountIT {
     }
 
     options.setOnSuccessMatcher(
-        new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles)));
+        new WordCountsMatcher(expectedWordCounts, expectedOutputFiles));
 
     WindowedWordCount.main(TestPipeline.convertToArgs(options));
   }
@@ -172,24 +178,28 @@ public class WindowedWordCountIT {
     private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
 
     private final SortedMap<String, Long> expectedWordCounts;
-    private final ShardedFile outputFile;
+    private final List<ShardedFile> outputFiles;
     private SortedMap<String, Long> actualCounts;
 
-    public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, ShardedFile outputFile) {
+    public WordCountsMatcher(
+        SortedMap<String, Long> expectedWordCounts, List<ShardedFile> outputFiles) {
       this.expectedWordCounts = expectedWordCounts;
-      this.outputFile = outputFile;
+      this.outputFiles = outputFiles;
     }
 
     @Override
     public boolean matchesSafely(PipelineResult pipelineResult) {
       try {
         // Load output data
-        List<String> lines =
-            outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+        List<String> outputLines = new ArrayList<>();
+        for (ShardedFile outputFile : outputFiles) {
+          outputLines.addAll(
+              outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()));
+        }
 
         // Since the windowing is nondeterministic we only check the sums
         actualCounts = new TreeMap<>();
-        for (String line : lines) {
+        for (String line : outputLines) {
           String[] splits = line.split(": ");
           String word = splits[0];
           long count = Long.parseLong(splits[1]);
@@ -205,7 +215,8 @@ public class WindowedWordCountIT {
         return actualCounts.equals(expectedWordCounts);
       } catch (Exception e) {
         throw new RuntimeException(
-            String.format("Failed to read from sharded output: %s", outputFile));
+            String.format("Failed to read from sharded output: %s due to exception",
+                outputFiles), e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index f4ae577..c4f1bd6 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -254,7 +254,8 @@ public class PTransformMatchers {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
         if (application.getTransform() instanceof Write) {
-          return ((Write) application.getTransform()).getSharding() == null;
+          Write write = (Write) application.getTransform();
+          return write.getSharding() == null && write.getNumShards() == null;
         }
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 63122fe..1bf5839 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -35,6 +35,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
@@ -54,8 +56,7 @@ class WriteWithShardingFactory<InputT>
   @Override
   public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
       Write<InputT> transform) {
-
-      return transform.withSharding(new LogElementShardsWithDrift<InputT>());
+    return transform.withSharding(new LogElementShardsWithDrift<InputT>());
   }
 
   @Override
@@ -74,6 +75,7 @@ class WriteWithShardingFactory<InputT>
     @Override
     public PCollectionView<Integer> expand(PCollection<T> records) {
       return records
+          .apply(Window.<T>into(new GlobalWindows()))
           .apply("CountRecords", Count.<T>globally())
           .apply("GenerateShardCount", ParDo.of(new CalculateShardsFn()))
           .apply(View.<Integer>asSingleton());

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 301d841..af36b80 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -63,6 +65,10 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
       }
 
       @Override
+      public void setWindowedWrites(boolean windowedWrites) {
+      }
+
+      @Override
       public void finalize(Iterable<Object> writerResults, PipelineOptions options)
           throws Exception {
 
@@ -141,7 +147,19 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
       public Writer<T, Object> createWriter(PipelineOptions options) throws Exception {
         return new Writer<T, Object>() {
           @Override
-          public void open(String uId) throws Exception {
+          public void openWindowed(String uId,
+                                   BoundedWindow window,
+                                   PaneInfo paneInfo,
+                                   int shard,
+                                   int numShards) throws Exception {
+          }
+
+          @Override
+          public void openUnwindowed(String uId, int shard, int numShards) throws Exception {
+          }
+
+          @Override
+          public void cleanup() throws Exception {
 
           }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 572c291..38b790e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -33,6 +33,8 @@ import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -119,6 +121,11 @@ public class WriteSinkITCase extends JavaProgramTestBase {
       }
 
       @Override
+      public void setWindowedWrites(boolean windowedWrites) {
+
+      }
+
+      @Override
       public void finalize(Iterable<String> writerResults, PipelineOptions options)
           throws Exception {
 
@@ -142,13 +149,27 @@ public class WriteSinkITCase extends JavaProgramTestBase {
         private PrintWriter internalWriter;
 
         @Override
-        public void open(String uId) throws Exception {
+        public final void openWindowed(String uId,
+                                       BoundedWindow window,
+                                       PaneInfo paneInfo,
+                                       int shard,
+                                       int numShards) throws Exception {
+          throw new UnsupportedOperationException("Windowed writes not supported.");
+        }
+
+        @Override
+        public final void openUnwindowed(String uId, int shard, int numShards) throws Exception {
           Path path = new Path(resultPath + "/" + uId);
           FileSystem.get(new URI("file:///")).create(path, false);
           internalWriter = new PrintWriter(new File(path.toUri()));
         }
 
         @Override
+        public void cleanup() throws Exception {
+
+        }
+
+        @Override
         public void write(String value) throws Exception {
           internalWriter.println(value);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 96f0a50..a41c9f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
+
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
@@ -39,6 +40,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -85,11 +87,21 @@ import org.apache.beam.sdk.values.PDone;
  * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to} to specify
+ * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify
  * the path of the file to write to (e.g., a local filename or sharded
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * {@code "gs://<bucket>/<filepath>"}). {@link AvroIO.Write#to(FilenamePolicy)} can also be used
+ * to specify a custom file naming policy.
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * preserved. When producing windowed writes, the number of output shards must be set explicitly
+ * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * runner-chosen value, so you may need not set it yourself. A
+ * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
+ * unique filenames.
  *
  * <p>It is required to specify {@link AvroIO.Write#withSchema}. To
  * write specific records, such as Avro-generated classes, provide an
@@ -369,6 +381,14 @@ public class AvroIO {
     }
 
     /**
+     * Returns a {@link PTransform} that writes to the file(s) specified by the provided
+     * {@link FileBasedSink.FilenamePolicy}.
+     */
+    public static Bound<GenericRecord> to(FilenamePolicy filenamePolicy) {
+      return new Bound<>(GenericRecord.class).to(filenamePolicy);
+    }
+
+    /**
      * Returns a {@link PTransform} that writes to the file(s) with the
      * given filename suffix.
      */
@@ -496,6 +516,9 @@ public class AvroIO {
       final Schema schema;
       /** An option to indicate if output validation is desired. Default is true. */
       final boolean validate;
+      final boolean windowedWrites;
+      FilenamePolicy filenamePolicy;
+
       /**
        * The codec used to encode the blocks in the Avro file. String value drawn from those in
        * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -515,7 +538,9 @@ public class AvroIO {
             null,
             true,
             DEFAULT_CODEC,
-            ImmutableMap.<String, Object>of());
+            ImmutableMap.<String, Object>of(),
+            false,
+            null);
       }
 
       Bound(
@@ -528,7 +553,9 @@ public class AvroIO {
           Schema schema,
           boolean validate,
           SerializableAvroCodecFactory codec,
-          Map<String, Object> metadata) {
+          Map<String, Object> metadata,
+          boolean windowedWrites,
+          FilenamePolicy filenamePolicy) {
         super(name);
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
@@ -538,6 +565,8 @@ public class AvroIO {
         this.schema = schema;
         this.validate = validate;
         this.codec = codec;
+        this.windowedWrites = windowedWrites;
+        this.filenamePolicy = filenamePolicy;
 
         Map<String, String> badKeys = Maps.newLinkedHashMap();
         for (Map.Entry<String, Object> entry : metadata.entrySet()) {
@@ -573,7 +602,25 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
+      }
+
+      public Bound<T> to(FilenamePolicy filenamePolicy) {
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec,
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -596,7 +643,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -625,7 +674,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -647,7 +698,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -670,7 +723,25 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
+      }
+
+      public Bound<T> withWindowedWrites() {
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec,
+            metadata,
+            true,
+            filenamePolicy);
       }
 
       /**
@@ -693,7 +764,9 @@ public class AvroIO {
             ReflectData.get().getSchema(type),
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -714,7 +787,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -749,7 +824,9 @@ public class AvroIO {
             schema,
             false,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -769,7 +846,9 @@ public class AvroIO {
             schema,
             validate,
             new SerializableAvroCodecFactory(codec),
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -789,31 +868,49 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       @Override
       public PDone expand(PCollection<T> input) {
-        if (filenamePrefix == null) {
+        if (filenamePolicy == null && filenamePrefix == null) {
           throw new IllegalStateException(
               "need to set the filename prefix of an AvroIO.Write transform");
         }
+        if (filenamePolicy != null && filenamePrefix != null) {
+          throw new IllegalStateException(
+              "cannot set both a filename policy and a filename prefix");
+        }
         if (schema == null) {
           throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
         }
 
-        org.apache.beam.sdk.io.Write<T> write =
-            org.apache.beam.sdk.io.Write.to(
-                new AvroSink<>(
-                    filenamePrefix,
-                    filenameSuffix,
-                    shardTemplate,
-                    AvroCoder.of(type, schema),
-                    codec,
-                    metadata));
+        org.apache.beam.sdk.io.Write<T> write = null;
+        if (filenamePolicy != null) {
+          write = org.apache.beam.sdk.io.Write.to(
+              new AvroSink<>(
+                  filenamePolicy,
+                  AvroCoder.of(type, schema),
+                  codec,
+                  metadata));
+        } else {
+          write = org.apache.beam.sdk.io.Write.to(
+              new AvroSink<>(
+                  filenamePrefix,
+                  filenameSuffix,
+                  shardTemplate,
+                  AvroCoder.of(type, schema),
+                  codec,
+                  metadata));
+        }
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
+        if (windowedWrites) {
+          write = write.withWindowedWrites();
+        }
         return input.apply("Write", write);
       }
 
@@ -940,6 +1037,18 @@ public class AvroIO {
 
     @VisibleForTesting
     AvroSink(
+        FilenamePolicy filenamePolicy,
+        AvroCoder<T> coder,
+        SerializableAvroCodecFactory codec,
+        ImmutableMap<String, Object> metadata) {
+      super(filenamePolicy);
+      this.coder = coder;
+      this.codec = codec;
+      this.metadata = metadata;
+    }
+
+    @VisibleForTesting
+    AvroSink(
         String baseOutputFilename,
         String extension,
         String fileNameTemplate,

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index ae28b62..9b5f130 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,34 +17,48 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -146,21 +160,165 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    */
   protected final WritableByteChannelFactory writableByteChannelFactory;
 
+
   /**
-   * Base filename for final output files.
+   * A naming policy for output files.
    */
-  protected final ValueProvider<String> baseOutputFilename;
+  public abstract static class FilenamePolicy implements Serializable {
+    /**
+     * Context used for generating a name based on shard numer, and num shards.
+     * The policy must produce unique filenames for unique {@link Context} objects.
+     *
+     * <p>Be careful about adding fields to this as existing strategies will not notice the new
+     * fields, and may not produce unique filenames.
+     */
+    public static class Context {
+      private int shardNumber;
+      private int numShards;
+
+
+      public Context(int shardNumber, int numShards) {
+        this.shardNumber = shardNumber;
+        this.numShards = numShards;
+      }
+
+      public int getShardNumber() {
+        return shardNumber;
+      }
+
+
+      public int getNumShards() {
+        return numShards;
+      }
+    }
+
+    /**
+     * Context used for generating a name based on window, pane, shard numer, and num shards.
+     * The policy must produce unique filenames for unique {@link WindowedContext} objects.
+     *
+     * <p>Be careful about adding fields to this as existing strategies will not notice the new
+     * fields, and may not produce unique filenames.
+     */
+    public static class WindowedContext {
+      private int shardNumber;
+      private int numShards;
+      private BoundedWindow window;
+      private PaneInfo paneInfo;
+
+      public WindowedContext(
+          BoundedWindow window,
+          PaneInfo paneInfo,
+          int shardNumber,
+          int numShards) {
+        this.window = window;
+        this.paneInfo = paneInfo;
+        this.shardNumber = shardNumber;
+        this.numShards = numShards;
+      }
+
+      public BoundedWindow getWindow() {
+        return window;
+      }
+
+      public PaneInfo getPaneInfo() {
+        return paneInfo;
+      }
+
+      public int getShardNumber() {
+        return shardNumber;
+      }
+
+      public int getNumShards() {
+        return numShards;
+      }
+    }
+
+    /**
+     * When a sink has requested windowed or triggered output, this method will be invoked to return
+     * the filename. The {@link WindowedContext} object gives access to the window and pane, as
+     * well as sharding information. The policy must return unique and consistent filenames
+     * for different windows and panes.
+     */
+    public abstract String windowedFilename(WindowedContext c);
+
+    /**
+     * When a sink has not requested windowed output, this method will be invoked to return the
+     * filename. The {@link Context} object only provides sharding information, which is used by
+     * the policy to generate unique and consistent filenames.
+     */
+    public abstract String unwindowedFilename(Context c);
+
+    /**
+     * @return The base filename for all output files.
+     */
+    public abstract ValueProvider<String> getBaseOutputFilenameProvider();
+
+    /**
+     * Populates the display data.
+     */
+    public void populateDisplayData(DisplayData.Builder builder) {
+    }
+  }
 
   /**
-   * The extension to be used for the final output files.
+   * A default filename policy.
    */
-  protected final String extension;
+  protected class DefaultFilenamePolicy extends FilenamePolicy {
+    ValueProvider<String> baseOutputFilename;
+    String extension;
+    String fileNamingTemplate;
+
+    public DefaultFilenamePolicy(ValueProvider<String> baseOutputFilename, String extension,
+                                 String fileNamingTemplate) {
+      this.baseOutputFilename = baseOutputFilename;
+      if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
+        this.extension = extension + getFileExtension(
+            writableByteChannelFactory.getFilenameSuffix());
+      } else {
+        this.extension = extension;
+      }
+      this.fileNamingTemplate = fileNamingTemplate;
+    }
+
+    @Override
+    public String unwindowedFilename(FilenamePolicy.Context context) {
+      if (context.numShards <= 0) {
+        return null;
+      }
+
+      String suffix = getFileExtension(extension);
+      String filename = IOChannelUtils.constructName(
+          baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
+          context.getNumShards());
+      return filename;
+    }
+
+    @Override
+    public String windowedFilename(FilenamePolicy.WindowedContext c) {
+      throw new UnsupportedOperationException("There is no default policy for windowed file"
+          + " output. Please provide an explicit FilenamePolicy to generate filenames.");
+    }
+
+    @Override
+    public ValueProvider<String> getBaseOutputFilenameProvider() {
+      return baseOutputFilename;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+    String fileNamePattern = String.format("%s%s%s",
+        baseOutputFilename.isAccessible()
+        ? baseOutputFilename.get() : baseOutputFilename.toString(),
+        fileNamingTemplate, getFileExtension(extension));
+    builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
+      .withLabel("File Name Pattern"));
+    }
+  }
 
   /**
-   * Naming template for output files. See {@link ShardNameTemplate} for a description of
-   * possible naming templates.  Default is {@link ShardNameTemplate#INDEX_OF_MAX}.
+   * The policy used to generate output filenames.
    */
-  protected final String fileNamingTemplate;
+  protected FilenamePolicy fileNamePolicy;
 
   /**
    * Construct a FileBasedSink with the given base output filename and extension. A
@@ -201,20 +359,30 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension,
       String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) {
     this.writableByteChannelFactory = writableByteChannelFactory;
-    this.baseOutputFilename = baseOutputFilename;
-    if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
-      this.extension = extension + getFileExtension(writableByteChannelFactory.getFilenameSuffix());
-    } else {
-      this.extension = extension;
-    }
-    this.fileNamingTemplate = fileNamingTemplate;
+    this.fileNamePolicy = new DefaultFilenamePolicy(baseOutputFilename, extension,
+        fileNamingTemplate);
+  }
+
+  public FileBasedSink(FilenamePolicy fileNamePolicy) {
+    this(fileNamePolicy, CompressionType.UNCOMPRESSED);
+
+  }
+
+  public FileBasedSink(FilenamePolicy fileNamePolicy,
+                       WritableByteChannelFactory writableByteChannelFactory) {
+    this.fileNamePolicy = fileNamePolicy;
+    this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
   /**
    * Returns the base output filename for this file based sink.
    */
   public ValueProvider<String> getBaseOutputFilenameProvider() {
-    return baseOutputFilename;
+    return fileNamePolicy.getBaseOutputFilenameProvider();
+  }
+
+  public FilenamePolicy getFileNamePolicy() {
+    return fileNamePolicy;
   }
 
   @Override
@@ -230,13 +398,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-
-    String fileNamePattern = String.format("%s%s%s",
-        baseOutputFilename.isAccessible()
-        ? baseOutputFilename.get() : baseOutputFilename.toString(),
-        fileNamingTemplate, getFileExtension(extension));
-    builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
-      .withLabel("File Name Pattern"));
+    getFileNamePolicy().populateDisplayData(builder);
   }
 
   /**
@@ -286,7 +448,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * constructor arguments.
    *
    * <p>Subclass implementations can change the file naming template by supplying a value for
-   * {@link FileBasedSink#fileNamingTemplate}.
+   * fileNamingTemplate.
    *
    * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
    * files will occur.
@@ -304,6 +466,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     /** Directory for temporary output files. */
     protected final ValueProvider<String> tempDirectory;
 
+    /** Whether windowed writes are being used. */
+    protected  boolean windowedWrites;
+
     /** Constructs a temporary file path given the temporary directory and a filename. */
     protected static String buildTemporaryFilename(String tempDirectory, String filename)
         throws IOException {
@@ -361,6 +526,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
+      this.windowedWrites = false;
     }
 
     /**
@@ -371,6 +537,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     @Override
     public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
 
+    @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+      this.windowedWrites = windowedWrites;
+    }
+
     /**
      * Initialization of the sink. Default implementation is a no-op. May be overridden by subclass
      * implementations to perform initialization of the sink at pipeline runtime. This method must
@@ -395,22 +566,55 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * @param writerResults the results of writes (FileResult).
      */
     @Override
-    public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
+    public void finalize(Iterable<FileResult> writerResults,
+                         PipelineOptions options)
         throws Exception {
       // Collect names of temporary files and rename them.
-      List<String> files = new ArrayList<>();
-      for (FileResult result : writerResults) {
-        LOG.debug("Temporary bundle output file {} will be copied.", result.getFilename());
-        files.add(result.getFilename());
-      }
-      copyToOutputFiles(files, options);
+      Map<String, String> outputFilenames = buildOutputFilenames(writerResults);
+      copyToOutputFiles(outputFilenames, options);
 
+      // Optionally remove temporary files.
       // We remove the entire temporary directory, rather than specifically removing the files
       // from writerResults, because writerResults includes only successfully completed bundles,
       // and we'd like to clean up the failed ones too.
       // Note that due to GCS eventual consistency, matching files in the temp directory is also
       // currently non-perfect and may fail to delete some files.
-      removeTemporaryFiles(files, options);
+      //
+      // When windows or triggers are specified, files are generated incrementally so deleting
+      // the entire directory in finalize is incorrect.
+      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites, options);
+    }
+
+    protected final Map<String, String> buildOutputFilenames(Iterable<FileResult> writerResults) {
+      Map<String, String> outputFilenames = new HashMap<>();
+      List<String> files = new ArrayList<>();
+      for (FileResult result : writerResults) {
+        if (result.getDestinationFilename() != null) {
+          outputFilenames.put(result.getFilename(), result.getDestinationFilename());
+        } else {
+          files.add(result.getFilename());
+        }
+      }
+
+      // If the user does not specify numShards() (not supported with windowing). Then the
+      // writerResults won't contain destination filenames, so we dynamically generate them here.
+      if (files.size() > 0) {
+        checkArgument(outputFilenames.isEmpty());
+        // Sort files for idempotence.
+        files = Ordering.natural().sortedCopy(files);
+        FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
+        for (int i = 0; i < files.size(); i++) {
+          outputFilenames.put(files.get(i),
+              filenamePolicy.unwindowedFilename(new Context(i, files.size())));
+        }
+      }
+
+      int numDistinctShards = new HashSet<String>(outputFilenames.values()).size();
+      checkState(numDistinctShards == outputFilenames.size(),
+         "Only generated %s distinct file names for %s files.",
+         numDistinctShards, outputFilenames.size());
+
+      return outputFilenames;
     }
 
     /**
@@ -425,47 +629,19 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
      *
      * @param filenames the filenames of temporary files.
-     * @return a list containing the names of final output files.
      */
-    protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options)
+    protected final void copyToOutputFiles(Map<String, String> filenames,
+                                           PipelineOptions options)
         throws IOException {
       int numFiles = filenames.size();
-      // Sort files for idempotence.
-      List<String> srcFilenames = Ordering.natural().sortedCopy(filenames);
-      List<String> destFilenames = generateDestinationFilenames(numFiles);
-
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
-        IOChannelUtils.getFactory(destFilenames.get(0))
-            .copy(srcFilenames, destFilenames);
+        IOChannelFactory channelFactory =
+            IOChannelUtils.getFactory(filenames.values().iterator().next());
+        channelFactory.copy(filenames.keySet(), filenames.values());
       } else {
         LOG.info("No output files to write.");
       }
-
-      return destFilenames;
-    }
-
-    /**
-     * Generate output bundle filenames.
-     */
-    protected final List<String> generateDestinationFilenames(int numFiles) {
-      List<String> destFilenames = new ArrayList<>();
-      String extension = getSink().extension;
-      String baseOutputFilename = getSink().baseOutputFilename.get();
-      String fileNamingTemplate = getSink().fileNamingTemplate;
-
-      String suffix = getFileExtension(extension);
-      for (int i = 0; i < numFiles; i++) {
-        destFilenames.add(IOChannelUtils.constructName(
-            baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
-      }
-
-      int numDistinctShards = new HashSet<String>(destFilenames).size();
-      checkState(numDistinctShards == numFiles,
-          "Shard name template '%s' only generated %s distinct file names for %s files.",
-          fileNamingTemplate, numDistinctShards, numFiles);
-
-      return destFilenames;
     }
 
     /**
@@ -475,7 +651,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
      * temporary files, this method will remove them.
      */
-    protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
+    protected final void removeTemporaryFiles(Set<String> knownFiles,
+                                              boolean shouldRemoveTemporaryDirectory,
+                                              PipelineOptions options)
         throws IOException {
       String tempDir = tempDirectory.get();
       LOG.debug("Removing temporary bundle output files in {}.", tempDir);
@@ -485,15 +663,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       // directory matching APIs, we remove not only files that the filesystem says exist
       // in the directory (which may be incomplete), but also files that are known to exist
       // (produced by successfully completed bundles).
+
       // This may still fail to remove temporary outputs of some failed bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully addressed.
       Set<String> matches = new HashSet<>();
       // TODO: Windows OS cannot resolves and matches '*' in the path,
       // ignore the exception for now to avoid failing the pipeline.
-      try {
-        matches.addAll(factory.match(factory.resolve(tempDir, "*")));
-      } catch (Exception e) {
-        LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+      if (shouldRemoveTemporaryDirectory) {
+        try {
+          matches.addAll(factory.match(factory.resolve(tempDir, "*")));
+        } catch (Exception e) {
+          LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+        }
       }
       Set<String> allMatches = new HashSet<>(matches);
       allMatches.addAll(knownFiles);
@@ -517,7 +698,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      */
     @Override
     public Coder<FileResult> getWriterResultCoder() {
-      return SerializableCoder.of(FileResult.class);
+      return FileResultCoder.of();
     }
 
     /**
@@ -553,8 +734,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      */
     private String id;
 
+    private BoundedWindow window;
+    private PaneInfo paneInfo;
+    private int shard = -1;
+    private int numShards = -1;
+
     /**
-     * The filename of the output bundle - $tempDirectory/$id.
+     * The filename of the output bundle.
      */
     private String filename;
 
@@ -610,8 +796,37 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * Opens the channel.
      */
     @Override
-    public final void open(String uId) throws Exception {
+    public final void openWindowed(String uId,
+                                   BoundedWindow window,
+                                   PaneInfo paneInfo,
+                                   int shard,
+                                   int numShards) throws Exception {
+      if (!getWriteOperation().windowedWrites) {
+        throw new IllegalStateException("openWindowed called a non-windowed sink.");
+      }
+      open(uId, window, paneInfo, shard, numShards);
+    }
+
+    @Override
+    public final void openUnwindowed(String uId,
+                                     int shard,
+                                     int numShards) throws Exception {
+      if (getWriteOperation().windowedWrites) {
+        throw new IllegalStateException("openUnwindowed called a windowed sink.");
+      }
+      open(uId, null, null, shard, numShards);
+    }
+
+    private void open(String uId,
+                      @Nullable BoundedWindow window,
+                      @Nullable PaneInfo paneInfo,
+                      int shard,
+                      int numShards) throws Exception {
       this.id = uId;
+      this.window = window;
+      this.paneInfo = paneInfo;
+      this.shard = shard;
+      this.numShards = numShards;
       filename = FileBasedWriteOperation.buildTemporaryFilename(
           getWriteOperation().tempDirectory.get(), uId);
       LOG.debug("Opening {}.", filename);
@@ -639,6 +854,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
     }
 
+    @Override
+    public void cleanup() throws Exception {
+      if (filename != null) {
+        IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
+      }
+    }
+
     /**
      * Closes the channel and returns the bundle result.
      */
@@ -653,8 +875,17 @@ public abstract class FileBasedSink<T> extends Sink<T> {
           throw new IllegalStateException("Channel should only be closed by its owner: " + channel);
         }
       }
-      FileResult result = new FileResult(filename);
-      LOG.debug("Result for bundle {}: {}", this.id, filename);
+
+      FilenamePolicy filenamePolicy = getWriteOperation().getSink().fileNamePolicy;
+      String destinationFile;
+      if (window != null) {
+        destinationFile = filenamePolicy.windowedFilename(new WindowedContext(
+            window, paneInfo, shard, numShards));
+      } else {
+        destinationFile =  filenamePolicy.unwindowedFilename(new Context(shard, numShards));
+      }
+      FileResult result = new FileResult(filename, destinationFile);
+      LOG.debug("Result for bundle {}: {} {}", this.id, filename, destinationFile);
       return result;
     }
 
@@ -670,18 +901,62 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   /**
    * Result of a single bundle write. Contains the filename of the bundle.
    */
-  public static final class FileResult implements Serializable {
+  public static final class FileResult {
     private final String filename;
+    private final String destinationFilename;
 
-    public FileResult(String filename) {
+    public FileResult(String filename, String destinationFilename) {
       this.filename = filename;
+      this.destinationFilename = destinationFilename;
     }
 
     public String getFilename() {
       return filename;
     }
+
+    public String getDestinationFilename() {
+      return destinationFilename;
+    }
+
+  }
+
+  /**
+   * A coder for FileResult objects.
+   */
+  public static final class FileResultCoder extends AtomicCoder<FileResult> {
+    private static final FileResultCoder INSTANCE = new FileResultCoder();
+    private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
+
+    @JsonCreator
+    public static FileResultCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(FileResult value, OutputStream outStream, Context context)
+        throws IOException {
+      if (value == null) {
+        throw new CoderException("cannot encode a null value");
+      }
+      stringCoder.encode(value.getFilename(), outStream, context.nested());
+      stringCoder.encode(value.getDestinationFilename(), outStream, context.nested());
+    }
+
+    @Override
+    public FileResult decode(InputStream inStream, Context context)
+        throws IOException {
+      return new FileResult(
+          stringCoder.decode(inStream, context.nested()),
+          stringCoder.decode(inStream, context.nested()));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      throw new NonDeterministicException(this, "TableRows are not deterministic.");
+    }
   }
 
+
   /**
    * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
    * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
index 6742784..d53c6ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
@@ -63,11 +65,12 @@ import org.apache.beam.sdk.values.PCollection;
  * operation corresponds to. See below for more information about these methods and restrictions on
  * their implementation.
  *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines four methods:
- * {@link Writer#open}, which is called once at the start of writing a bundle; {@link Writer#write},
- * which writes a single record from the bundle; {@link Writer#close}, which is called once at the
- * end of writing a bundle; and {@link Writer#getWriteOperation}, which returns the write operation
- * that the writer belongs to.
+ * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines several methods:
+ * {@link Writer#openWindowed} and {@link Writer#openUnwindowed}, which are called once at the
+ * start of writing a bundle, depending on whether windowed or unwindowed output is requested.
+ * {@link Writer#write}, which writes a single record from the bundle; {@link Writer#close},
+ * which is called once at the end of writing a bundle; and {@link Writer#getWriteOperation},
+ * which returns the write operation that the writer belongs to.
  * </ul>
  *
  * <h2>WriteOperation</h2>
@@ -95,9 +98,10 @@ import org.apache.beam.sdk.values.PCollection;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#open} is passed
- * a unique <i>bundle id</i> when it is called by the Write transform, so even redundant or retried
- * bundles will have a unique way of identifying their output.
+ * result passed to the WriteOperation's finalize method. Each call to {@link Writer#openWindowed}
+ * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the
+ * Write transform, so even redundant or retried bundles will have a unique way of identifying
+ * their output.
  *
  * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
@@ -174,6 +178,11 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
     public abstract void initialize(PipelineOptions options) throws Exception;
 
     /**
+     * Indicates that the operation will be performing windowed writes.
+     */
+    public abstract void setWindowedWrites(boolean windowedWrites);
+
+    /**
      * Given an Iterable of results from bundle writes, performs finalization after writing and
      * closes the sink. Called after all bundle writes are complete.
      *
@@ -200,7 +209,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
      * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
      *
      * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
-     * {@link Writer#open}.
+     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
      *
      * <p>Must not mutate the state of the WriteOperation.
      */
@@ -218,9 +227,10 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
   }
 
   /**
-   * A Writer writes a bundle of elements from a PCollection to a sink. {@link Writer#open} is
-   * called before writing begins and {@link Writer#close} is called after all elements in the
-   * bundle have been written. {@link Writer#write} writes an element to the sink.
+   * A Writer writes a bundle of elements from a PCollection to a sink.
+   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
+   * and {@link Writer#close} is called after all elements in the bundle have been written.
+   * {@link Writer#write} writes an element to the sink.
    *
    * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
    * multiple instances of a Writer may be instantiated in different threads on the same worker.
@@ -238,8 +248,25 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
      * <p>The unique id that is given to open should be used to ensure that the writer's output does
      * not interfere with the output of other Writers, as a bundle may be executed many times for
      * fault tolerance. See {@link Sink} for more information about bundle ids.
+     *
+     * <p></p>The window and paneInfo arguments are populated when windowed writes are requested.
+     * shard and numbShards are populated for the case of static sharding. In cases where the
+     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
+     */
+    public abstract void openWindowed(String uId,
+                                      BoundedWindow window,
+                                      PaneInfo paneInfo,
+                                      int shard,
+                                      int numShards) throws Exception;
+
+    /**
+     * Perform bundle initialization for the case where the file is written unwindowed.
      */
-    public abstract void open(String uId) throws Exception;
+    public abstract void openUnwindowed(String uId,
+                                        int shard,
+                                        int numShards) throws Exception;
+
+    public abstract void cleanup() throws Exception;
 
     /**
      * Called for each value in the bundle.
@@ -262,5 +289,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
      * Returns the write operation this writer belongs to.
      */
     public abstract WriteOperation<T, WriteT> getWriteOperation();
+
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 58b55a9..ea80639 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -85,6 +86,14 @@ import org.apache.beam.sdk.values.PDone;
  * filename or sharded filename pattern of the form
  * {@code "gs://<bucket>/<filepath>"}).
  *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * preserved. When producing windowed writes, the number of output shards must be set explicitly
+ * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
+ * set, and unique windows and triggers must produce unique filenames.
+ *
  * <p>Any existing files with the same names as generated output files
  * will be overwritten.
  *
@@ -352,6 +361,10 @@ public class TextIO {
       return new Bound().to(prefix);
     }
 
+    public static Bound to(FilenamePolicy filenamePolicy) {
+      return new Bound().to(filenamePolicy);
+
+    }
     /**
      * Like {@link #to(String)}, but with a {@link ValueProvider}.
      */
@@ -479,6 +492,12 @@ public class TextIO {
       /** An option to indicate if output validation is desired. Default is true. */
       private final boolean validate;
 
+      /** A policy for naming output files. */
+      private final FilenamePolicy filenamePolicy;
+
+      /** Whether to write windowed output files. */
+      private boolean windowedWrites;
+
       /**
        * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
        * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
@@ -487,13 +506,15 @@ public class TextIO {
 
       private Bound() {
         this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true,
-            FileBasedSink.CompressionType.UNCOMPRESSED);
+            FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
       }
 
       private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
           @Nullable String header, @Nullable String footer, int numShards,
           String shardTemplate, boolean validate,
-          WritableByteChannelFactory writableByteChannelFactory) {
+          WritableByteChannelFactory writableByteChannelFactory,
+          FilenamePolicy filenamePolicy,
+          boolean windowedWrites) {
         super(name);
         this.header = header;
         this.footer = footer;
@@ -504,6 +525,8 @@ public class TextIO {
         this.validate = validate;
         this.writableByteChannelFactory =
             firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
+        this.filenamePolicy = filenamePolicy;
+        this.windowedWrites = windowedWrites;
       }
 
       /**
@@ -518,7 +541,7 @@ public class TextIO {
         validateOutputComponent(filenamePrefix);
         return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
             header, footer, numShards, shardTemplate, validate,
-            writableByteChannelFactory);
+            writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -526,7 +549,15 @@ public class TextIO {
        */
       public Bound to(ValueProvider<String> filenamePrefix) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      }
+
+       /**
+        * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
+        */
+      public Bound to(FilenamePolicy filenamePolicy) {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -540,7 +571,7 @@ public class TextIO {
       public Bound withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
         return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -560,7 +591,7 @@ public class TextIO {
       public Bound withNumShards(int numShards) {
         checkArgument(numShards >= 0);
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -573,7 +604,7 @@ public class TextIO {
        */
       public Bound withShardNameTemplate(String shardTemplate) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -591,7 +622,7 @@ public class TextIO {
        */
       public Bound withoutSharding() {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
-            validate, writableByteChannelFactory);
+            validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -606,7 +637,7 @@ public class TextIO {
        */
       public Bound withoutValidation() {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, false, writableByteChannelFactory);
+            shardTemplate, false, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -621,7 +652,7 @@ public class TextIO {
        */
       public Bound withHeader(@Nullable String header) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -636,7 +667,7 @@ public class TextIO {
        */
       public Bound withFooter(@Nullable String footer) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -653,22 +684,39 @@ public class TextIO {
       public Bound withWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      }
+
+      public Bound withWindowedWrites() {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+            shardTemplate, validate, writableByteChannelFactory, filenamePolicy, true);
       }
 
       @Override
       public PDone expand(PCollection<String> input) {
-        if (filenamePrefix == null) {
+        if (filenamePolicy == null && filenamePrefix == null) {
+          throw new IllegalStateException(
+              "need to set the filename prefix of an TextIO.Write transform");
+        }
+        if (filenamePolicy != null && filenamePrefix != null) {
           throw new IllegalStateException(
-              "need to set the filename prefix of a TextIO.Write transform");
+              "cannot set both a filename policy and a filename prefix");
+        }
+        org.apache.beam.sdk.io.Write<String> write = null;
+        if (filenamePolicy != null) {
+         write = org.apache.beam.sdk.io.Write.to(
+             new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+        } else {
+          write = org.apache.beam.sdk.io.Write.to(
+              new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+                  writableByteChannelFactory));
         }
-        org.apache.beam.sdk.io.Write<String> write =
-            org.apache.beam.sdk.io.Write.to(
-                new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
-                    writableByteChannelFactory));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
+        if (windowedWrites) {
+          write = write.withWindowedWrites();
+        }
         return input.apply("Write", write);
       }
 
@@ -676,8 +724,11 @@ public class TextIO {
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
 
-        String prefixString = filenamePrefix.isAccessible()
-            ? filenamePrefix.get() : filenamePrefix.toString();
+        String prefixString = "";
+        if (filenamePrefix != null) {
+          prefixString = filenamePrefix.isAccessible()
+              ? filenamePrefix.get() : filenamePrefix.toString();
+        }
         builder
             .addIfNotNull(DisplayData.item("filePrefix", prefixString)
               .withLabel("Output File Prefix"))
@@ -1023,6 +1074,13 @@ public class TextIO {
     @Nullable private final String footer;
 
     @VisibleForTesting
+    TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer,
+             WritableByteChannelFactory writableByteChannelFactory) {
+      super(filenamePolicy, writableByteChannelFactory);
+      this.header = header;
+      this.footer = footer;
+    }
+    @VisibleForTesting
     TextSink(
         ValueProvider<String> baseOutputFilename,
         String extension,


[3/3] beam git commit: This closes #1926: Allow unbounded windowed PCollections for FileBasedSinks

Posted by ke...@apache.org.
This closes #1926: Allow unbounded windowed PCollections for FileBasedSinks

  Add windowing support to FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc907c58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc907c58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc907c58

Branch: refs/heads/master
Commit: bc907c58b1da97e53dd0f4b6bda0834b41bb6e66
Parents: 8e5cfde 6addc95
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 5 10:23:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 5 10:23:20 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |   1 -
 .../apache/beam/examples/WindowedWordCount.java |  34 +-
 .../examples/common/WriteOneFilePerWindow.java  |  91 ++++
 .../examples/common/WriteWindowedFilesDoFn.java |  77 ----
 .../beam/examples/WindowedWordCountIT.java      |  41 +-
 .../core/construction/PTransformMatchers.java   |   3 +-
 .../direct/WriteWithShardingFactory.java        |   6 +-
 .../streaming/io/UnboundedFlinkSink.java        |  20 +-
 .../beam/runners/flink/WriteSinkITCase.java     |  23 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 157 +++++--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 429 +++++++++++++++----
 .../main/java/org/apache/beam/sdk/io/Sink.java  |  55 ++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  98 ++++-
 .../main/java/org/apache/beam/sdk/io/Write.java | 377 +++++++++-------
 .../java/org/apache/beam/sdk/io/XmlSink.java    |   6 +-
 .../beam/sdk/testing/TestPipelineOptions.java   |   5 +
 .../beam/sdk/util/FileIOChannelFactory.java     |  23 +-
 .../beam/sdk/util/GcsIOChannelFactory.java      |   3 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  21 +-
 .../apache/beam/sdk/util/IOChannelFactory.java  |   3 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 146 ++++++-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  94 ++--
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  49 ++-
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |   8 +-
 .../beam/sdk/testing/TestPipelineTest.java      |  17 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   |  24 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      |   2 +-
 27 files changed, 1295 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bc907c58/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------