You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:45 UTC

[33/50] [abbrv] beam git commit: Adds DynamicDestinations support to FileBasedSink

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 9468893..8797ff7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -42,7 +42,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -69,22 +71,31 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -205,7 +216,7 @@ public class TextIOTest {
     });
   }
 
-  private <T> void runTestRead(String[] expected) throws Exception {
+  private void runTestRead(String[] expected) throws Exception {
     File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
     String filename = tmpFile.getPath();
 
@@ -274,6 +285,213 @@ public class TextIOTest {
         displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
   }
 
+  static class TestDynamicDestinations extends DynamicDestinations<String, String> {
+    ResourceId baseDir;
+
+    TestDynamicDestinations(ResourceId baseDir) {
+      this.baseDir = baseDir;
+    }
+
+    @Override
+    public String getDestination(String element) {
+      // Destination is based on first character of string.
+      return element.substring(0, 1);
+    }
+
+    @Override
+    public String getDefaultDestination() {
+      return "";
+    }
+
+    @Nullable
+    @Override
+    public Coder<String> getDestinationCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    @Override
+    public FilenamePolicy getFilenamePolicy(String destination) {
+      return DefaultFilenamePolicy.fromStandardParameters(
+          StaticValueProvider.of(
+              baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)),
+          null,
+          null,
+          false);
+    }
+  }
+
+  class StartsWith implements Predicate<String> {
+    String prefix;
+
+    StartsWith(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public boolean apply(@Nullable String input) {
+      return input.startsWith(prefix);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinations() throws Exception {
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true);
+
+    List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
+    PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
+    input.apply(
+        TextIO.write()
+            .to(new TestDynamicDestinations(baseDir))
+            .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+    p.run();
+
+    assertOutputFiles(
+        Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class),
+        null,
+        null,
+        0,
+        baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class),
+        null,
+        null,
+        0,
+        baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class),
+        null,
+        null,
+        0,
+        baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+  }
+
+  @DefaultCoder(AvroCoder.class)
+  private static class UserWriteType {
+    String destination;
+    String metadata;
+
+    UserWriteType() {
+      this.destination = "";
+      this.metadata = "";
+    }
+
+    UserWriteType(String destination, String metadata) {
+      this.destination = destination;
+      this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("destination: %s metadata : %s", destination, metadata);
+    }
+  }
+
+  private static class SerializeUserWrite implements SerializableFunction<UserWriteType, String> {
+    @Override
+    public String apply(UserWriteType input) {
+      return input.toString();
+    }
+  }
+
+  private static class UserWriteDestination implements SerializableFunction<UserWriteType, Params> {
+    private ResourceId baseDir;
+
+    UserWriteDestination(ResourceId baseDir) {
+      this.baseDir = baseDir;
+    }
+
+    @Override
+    public Params apply(UserWriteType input) {
+      return new Params()
+          .withBaseFilename(
+              baseDir.resolve(
+                  "file_" + input.destination.substring(0, 1) + ".txt",
+                  StandardResolveOptions.RESOLVE_FILE));
+    }
+  }
+
+  private static class ExtractWriteDestination implements Function<UserWriteType, String> {
+    @Override
+    public String apply(@Nullable UserWriteType input) {
+      return input.destination;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDefaultFilenamePolicy() throws Exception {
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true);
+
+    List<UserWriteType> elements =
+        Lists.newArrayList(
+            new UserWriteType("aaaa", "first"),
+            new UserWriteType("aaab", "second"),
+            new UserWriteType("baaa", "third"),
+            new UserWriteType("baab", "fourth"),
+            new UserWriteType("caaa", "fifth"),
+            new UserWriteType("caab", "sixth"));
+    PCollection<UserWriteType> input = p.apply(Create.of(elements));
+    input.apply(
+        TextIO.writeCustomType(new SerializeUserWrite())
+            .to(new UserWriteDestination(baseDir), new Params())
+            .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+    p.run();
+
+    String[] aElements =
+        Iterables.toArray(
+            Iterables.transform(
+                Iterables.filter(
+                    elements,
+                    Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())),
+                Functions.toStringFunction()),
+            String.class);
+    String[] bElements =
+        Iterables.toArray(
+            Iterables.transform(
+                Iterables.filter(
+                    elements,
+                    Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())),
+                Functions.toStringFunction()),
+            String.class);
+    String[] cElements =
+        Iterables.toArray(
+            Iterables.transform(
+                Iterables.filter(
+                    elements,
+                    Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())),
+                Functions.toStringFunction()),
+            String.class);
+    assertOutputFiles(
+        aElements,
+        null,
+        null,
+        0,
+        baseDir.resolve("file_a.txt", StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        bElements,
+        null,
+        null,
+        0,
+        baseDir.resolve("file_b.txt", StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        cElements,
+        null,
+        null,
+        0,
+        baseDir.resolve("file_c.txt", StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+  }
+
   private void runTestWrite(String[] elems) throws Exception {
     runTestWrite(elems, null, null, 1);
   }
@@ -291,7 +509,8 @@ public class TextIOTest {
       String[] elems, String header, String footer, int numShards) throws Exception {
     String outputName = "file.txt";
     Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
-    String baseFilename = baseDir.resolve(outputName).toString();
+    ResourceId baseFilename =
+        FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
 
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
@@ -311,8 +530,14 @@ public class TextIOTest {
 
     p.run();
 
-    assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
-        firstNonNull(write.getShardTemplate(),
+    assertOutputFiles(
+        elems,
+        header,
+        footer,
+        numShards,
+        baseFilename,
+        firstNonNull(
+            write.inner.getShardTemplate(),
             DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
   }
 
@@ -321,13 +546,12 @@ public class TextIOTest {
       final String header,
       final String footer,
       int numShards,
-      Path rootLocation,
-      String outputName,
+      ResourceId outputPrefix,
       String shardNameTemplate)
       throws Exception {
     List<File> expectedFiles = new ArrayList<>();
     if (numShards == 0) {
-      String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString();
+      String pattern = outputPrefix.toString() + "*";
       List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
       for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) {
         expectedFiles.add(new File(expectedFile.resourceId().toString()));
@@ -336,9 +560,9 @@ public class TextIOTest {
       for (int i = 0; i < numShards; i++) {
         expectedFiles.add(
             new File(
-                rootLocation.toString(),
                 DefaultFilenamePolicy.constructName(
-                    outputName, shardNameTemplate, "", i, numShards, null, null)));
+                        outputPrefix, shardNameTemplate, "", i, numShards, null, null)
+                    .toString()));
       }
     }
 
@@ -456,14 +680,19 @@ public class TextIOTest {
   public void testWriteWithWritableByteChannelFactory() throws Exception {
     Coder<String> coder = StringUtf8Coder.of();
     String outputName = "file.txt";
-    Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tempFolder, "testwrite").toString(), true);
 
     PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
 
     final WritableByteChannelFactory writableByteChannelFactory =
         new DrunkWritableByteChannelFactory();
-    TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString())
-        .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
+    TextIO.Write write =
+        TextIO.write()
+            .to(baseDir.resolve(outputName, StandardResolveOptions.RESOLVE_FILE).toString())
+            .withoutSharding()
+            .withWritableByteChannelFactory(writableByteChannelFactory);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
 
@@ -476,8 +705,15 @@ public class TextIOTest {
       drunkElems.add(elem);
       drunkElems.add(elem);
     }
-    assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir,
-        outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardTemplate());
+    assertOutputFiles(
+        drunkElems.toArray(new String[0]),
+        null,
+        null,
+        1,
+        baseDir.resolve(
+            outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(),
+            StandardResolveOptions.RESOLVE_FILE),
+        write.inner.getShardTemplate());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index e6a0dcf..55f2a87 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -41,7 +42,11 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -58,16 +63,20 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Duration;
 import org.joda.time.format.DateTimeFormatter;
@@ -164,7 +173,11 @@ public class WriteFilesTest {
   public void testWrite() throws IOException {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
+    runWrite(
+        inputs,
+        IDENTITY_MAP,
+        getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
   /**
@@ -173,8 +186,11 @@ public class WriteFilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptyWrite() throws IOException {
-    runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink()));
+    runWrite(
+        Collections.<String>emptyList(),
+        IDENTITY_MAP,
+        getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
     checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
         Optional.of(1));
   }
@@ -190,7 +206,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink()).withNumShards(1));
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()).withNumShards(1));
   }
 
   private ResourceId getBaseOutputDirectory() {
@@ -198,9 +214,13 @@ public class WriteFilesTest {
         .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
 
   }
-  private SimpleSink makeSimpleSink() {
-    FilenamePolicy filenamePolicy = new PerWindowFiles("file", "simple");
-    return new SimpleSink(getBaseOutputDirectory(), filenamePolicy);
+
+  private SimpleSink<Void> makeSimpleSink() {
+    FilenamePolicy filenamePolicy =
+        new PerWindowFiles(
+            getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE),
+            "simple");
+    return SimpleSink.makeSimpleSink(getBaseOutputDirectory(), filenamePolicy);
   }
 
   @Test
@@ -219,8 +239,10 @@ public class WriteFilesTest {
       timestamps.add(i + 1);
     }
 
-    SimpleSink sink = makeSimpleSink();
-    WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt());
+    SimpleSink<Void> sink = makeSimpleSink();
+    WriteFiles<String, ?, String> write =
+        WriteFiles.to(sink, SerializableFunctions.<String>identity())
+            .withSharding(new LargestInt());
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
         .apply(write);
@@ -241,7 +263,8 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink()).withNumShards(20));
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())
+            .withNumShards(20));
   }
 
   /**
@@ -251,7 +274,11 @@ public class WriteFilesTest {
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws IOException {
     List<String> inputs = new ArrayList<>();
-    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
+    runWrite(
+        inputs,
+        IDENTITY_MAP,
+        getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
   /**
@@ -263,8 +290,10 @@ public class WriteFilesTest {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
     runWrite(
-        inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
-        getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
+        inputs,
+        new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
+        getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
   /**
@@ -278,10 +307,9 @@ public class WriteFilesTest {
 
     runWrite(
         inputs,
-        new WindowAndReshuffle<>(
-            Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
+        new WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink()));
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
   }
 
   @Test
@@ -292,15 +320,19 @@ public class WriteFilesTest {
       inputs.add("mambo_number_" + i);
     }
     runWrite(
-        inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))),
+        inputs,
+        Window.<String>into(FixedWindows.of(Duration.millis(2))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
+        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())
+            .withMaxNumWritersPerBundle(2)
+            .withWindowedWrites());
   }
 
   public void testBuildWrite() {
-    SimpleSink sink = makeSimpleSink();
-    WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
-    assertThat((SimpleSink) write.getSink(), is(sink));
+    SimpleSink<Void> sink = makeSimpleSink();
+    WriteFiles<String, ?, String> write =
+        WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(3);
+    assertThat((SimpleSink<Void>) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
         write.getSharding();
 
@@ -309,25 +341,37 @@ public class WriteFilesTest {
     assertThat(write.getNumShards().get(), equalTo(3));
     assertThat(write.getSharding(), equalTo(originalSharding));
 
-    WriteFiles<String> write2 = write.withSharding(SHARDING_TRANSFORM);
-    assertThat((SimpleSink) write2.getSink(), is(sink));
+    WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM);
+    assertThat((SimpleSink<Void>) write2.getSink(), is(sink));
     assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
     // original unchanged
 
-    WriteFiles<String> writeUnsharded = write2.withRunnerDeterminedSharding();
+    WriteFiles<String, ?, ?> writeUnsharded = write2.withRunnerDeterminedSharding();
     assertThat(writeUnsharded.getSharding(), nullValue());
     assertThat(write.getSharding(), equalTo(originalSharding));
   }
 
   @Test
   public void testDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-    WriteFiles<String> write = WriteFiles.to(sink);
+    DynamicDestinations<String, Void> dynamicDestinations =
+        DynamicFileDestinations.constant(
+            DefaultFilenamePolicy.fromParams(
+                new Params()
+                    .withBaseFilename(
+                        getBaseOutputDirectory()
+                            .resolve("file", StandardResolveOptions.RESOLVE_FILE))
+                    .withShardTemplate("-SS-of-NN")));
+    SimpleSink<Void> sink =
+        new SimpleSink<Void>(
+            getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) {
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "bar"));
+          }
+        };
+    WriteFiles<String, ?, String> write =
+        WriteFiles.to(sink, SerializableFunctions.<String>identity());
+
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
@@ -335,14 +379,145 @@ public class WriteFilesTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testUnboundedNeedsWindowed() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Must use windowed writes when applying WriteFiles to an unbounded PCollection");
+
+    SimpleSink<Void> sink = makeSimpleSink();
+    p.apply(Create.of("foo"))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnboundedNeedsSharding() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "When applying WriteFiles to an unbounded PCollection, "
+            + "must specify number of output shards explicitly");
+
+    SimpleSink<Void> sink = makeSimpleSink();
+    p.apply(Create.of("foo"))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()).withWindowedWrites());
+    p.run();
+  }
+
+  // Test DynamicDestinations class. Expects user values to be string-encoded integers.
+  // Stores the integer mod 5 as the destination, and uses that in the file prefix.
+  static class TestDestinations extends DynamicDestinations<String, Integer> {
+    private ResourceId baseOutputDirectory;
+
+    TestDestinations(ResourceId baseOutputDirectory) {
+      this.baseOutputDirectory = baseOutputDirectory;
+    }
+
+    @Override
+    public Integer getDestination(String element) {
+      return Integer.valueOf(element) % 5;
+    }
+
+    @Override
+    public Integer getDefaultDestination() {
+      return 0;
+    }
+
+    @Override
+    public FilenamePolicy getFilenamePolicy(Integer destination) {
+      return new PerWindowFiles(
+          baseOutputDirectory.resolve("file_" + destination, StandardResolveOptions.RESOLVE_FILE),
+          "simple");
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+    }
+  }
+
+  // Test format function. Prepend a string to each record before writing.
+  static class TestDynamicFormatFunction implements SerializableFunction<String, String> {
+    @Override
+    public String apply(String input) {
+      return "record_" + input;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsBounded() throws Exception {
+    testDynamicDestinationsHelper(true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsUnbounded() throws Exception {
+    testDynamicDestinationsHelper(false);
+  }
+
+  private void testDynamicDestinationsHelper(boolean bounded) throws IOException {
+    TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory());
+    SimpleSink<Integer> sink =
+        new SimpleSink<>(
+            getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED);
+
+    // Flag to validate that the pipeline options are passed to the Sink.
+    WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
+    options.setTestFlag("test_value");
+    Pipeline p = TestPipeline.create(options);
+
+    List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+    // Prepare timestamps for the elements.
+    List<Long> timestamps = new ArrayList<>();
+    for (long i = 0; i < inputs.size(); i++) {
+      timestamps.add(i + 1);
+    }
+
+    WriteFiles<String, Integer, String> writeFiles =
+        WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1);
+
+    PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps));
+    if (!bounded) {
+      input.setIsBoundedInternal(IsBounded.UNBOUNDED);
+      input = input.apply(Window.<String>into(FixedWindows.of(Duration.standardDays(1))));
+      input.apply(writeFiles.withWindowedWrites());
+    } else {
+      input.apply(writeFiles);
+    }
+    p.run();
+
+    for (int i = 0; i < 5; ++i) {
+      ResourceId base =
+          getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE);
+      List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i + 5));
+      checkFileContents(base.toString(), expected, Optional.of(1));
+    }
+  }
+
+  @Test
   public void testShardedDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-    WriteFiles<String> write = WriteFiles.to(sink).withNumShards(1);
+    DynamicDestinations<String, Void> dynamicDestinations =
+        DynamicFileDestinations.constant(
+            DefaultFilenamePolicy.fromParams(
+                new Params()
+                    .withBaseFilename(
+                        getBaseOutputDirectory()
+                            .resolve("file", StandardResolveOptions.RESOLVE_FILE))
+                    .withShardTemplate("-SS-of-NN")));
+    SimpleSink<Void> sink =
+        new SimpleSink<Void>(
+            getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) {
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "bar"));
+          }
+        };
+    WriteFiles<String, ?, String> write =
+        WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -351,14 +526,24 @@ public class WriteFilesTest {
 
   @Test
   public void testCustomShardStrategyDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-    WriteFiles<String> write =
-        WriteFiles.to(sink)
+    DynamicDestinations<String, Void> dynamicDestinations =
+        DynamicFileDestinations.constant(
+            DefaultFilenamePolicy.fromParams(
+                new Params()
+                    .withBaseFilename(
+                        getBaseOutputDirectory()
+                            .resolve("file", StandardResolveOptions.RESOLVE_FILE))
+                    .withShardTemplate("-SS-of-NN")));
+    SimpleSink<Void> sink =
+        new SimpleSink<Void>(
+            getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) {
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "bar"));
+          }
+        };
+    WriteFiles<String, ?, String> write =
+        WriteFiles.to(sink, SerializableFunctions.<String>identity())
             .withSharding(
                 new PTransform<PCollection<String>, PCollectionView<Integer>>() {
                   @Override
@@ -383,59 +568,77 @@ public class WriteFilesTest {
    * PCollection are written to the sink.
    */
   private void runWrite(
-      List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
-      String baseName, WriteFiles<String> write) throws IOException {
+      List<String> inputs,
+      PTransform<PCollection<String>, PCollection<String>> transform,
+      String baseName,
+      WriteFiles<String, ?, String> write)
+      throws IOException {
     runShardedWrite(inputs, transform, baseName, write);
   }
 
   private static class PerWindowFiles extends FilenamePolicy {
     private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis();
-    private final String prefix;
+    private final ResourceId baseFilename;
     private final String suffix;
 
-    public PerWindowFiles(String prefix, String suffix) {
-      this.prefix = prefix;
+    public PerWindowFiles(ResourceId baseFilename, String suffix) {
+      this.baseFilename = baseFilename;
       this.suffix = suffix;
     }
 
     public String filenamePrefixForWindow(IntervalWindow window) {
+      String prefix =
+          baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
       return String.format("%s%s-%s",
           prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
     }
 
     @Override
-    public ResourceId windowedFilename(
-        ResourceId outputDirectory, WindowedContext context, String extension) {
+    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      String filename = String.format(
-          "%s-%s-of-%s%s%s",
-          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
-          extension, suffix);
-      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+      String filename =
+          String.format(
+              "%s-%s-of-%s%s%s",
+              filenamePrefixForWindow(window),
+              context.getShardNumber(),
+              context.getNumShards(),
+              outputFileHints.getSuggestedFilenameSuffix(),
+              suffix);
+      return baseFilename
+          .getCurrentDirectory()
+          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(
-        ResourceId outputDirectory, Context context, String extension) {
-      String filename = String.format(
-          "%s%s-of-%s%s%s",
-          prefix, context.getShardNumber(), context.getNumShards(),
-          extension, suffix);
-      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+      String prefix =
+          baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
+      String filename =
+          String.format(
+              "%s-%s-of-%s%s%s",
+              prefix,
+              context.getShardNumber(),
+              context.getNumShards(),
+              outputFileHints.getSuggestedFilenameSuffix(),
+              suffix);
+      return baseFilename
+          .getCurrentDirectory()
+          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
   }
 
   /**
    * Performs a WriteFiles transform with the desired number of shards. Verifies the WriteFiles
    * transform calls the appropriate methods on a test sink in the correct order, as well as
-   * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards
-   * is not null, also verifies that the output number of shards is correct.
+   * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards is
+   * not null, also verifies that the output number of shards is correct.
    */
   private void runShardedWrite(
       List<String> inputs,
       PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName,
-      WriteFiles<String> write) throws IOException {
+      WriteFiles<String, ?, String> write)
+      throws IOException {
     // Flag to validate that the pipeline options are passed to the Sink
     WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
     options.setTestFlag("test_value");

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 4393a63..e46b1d3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -57,6 +58,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index edb1e0d..c5c2462 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -23,8 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -32,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
@@ -158,21 +158,16 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
     }
     // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
     // We must first use reflection to figure out what the type parameter is.
-    for (Type superclass = getClass().getGenericSuperclass();
-        superclass != null;
-        superclass = ((Class) superclass).getGenericSuperclass()) {
-      if (superclass instanceof ParameterizedType) {
-        ParameterizedType parameterized = (ParameterizedType) superclass;
-        if (parameterized.getRawType() == DynamicDestinations.class) {
-          // DestinationT is the second parameter.
-          Type parameter = parameterized.getActualTypeArguments()[1];
-          @SuppressWarnings("unchecked")
-          Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
-          return registry.getCoder(parameterClass);
-        }
-      }
+    TypeDescriptor<?> superDescriptor =
+        TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
+    if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
+      throw new AssertionError(
+          "Couldn't find the DynamicDestinations superclass of " + this.getClass());
     }
-    throw new AssertionError(
-        "Couldn't find the DynamicDestinations superclass of " + this.getClass());
+    TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
+    @SuppressWarnings("unchecked")
+    TypeDescriptor<DestinationT> descriptor =
+        (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+    return registry.getCoder(descriptor);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
index 90d41a0..55672ff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ShardedKey;
 
 /**
  * Given a write to a specific table, assign that to one of the

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
deleted file mode 100644
index c2b739f..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * A key and a shard number.
- */
-class ShardedKey<K> implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private final K key;
-  private final int shardNumber;
-
-  public static <K> ShardedKey<K> of(K key, int shardNumber) {
-    return new ShardedKey<>(key, shardNumber);
-  }
-
-  ShardedKey(K key, int shardNumber) {
-    this.key = key;
-    this.shardNumber = shardNumber;
-  }
-
-  public K getKey() {
-    return key;
-  }
-
-  public int getShardNumber() {
-    return shardNumber;
-  }
-
-  @Override
-  public String toString() {
-    return "key: " + key + " shard: " + shardNumber;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof ShardedKey)) {
-      return false;
-    }
-    ShardedKey<K> other = (ShardedKey<K>) o;
-    return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(key, shardNumber);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
deleted file mode 100644
index c2b62b7..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-
-
-/**
- * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
- */
-@VisibleForTesting
-class ShardedKeyCoder<KeyT>
-    extends StructuredCoder<ShardedKey<KeyT>> {
-  public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
-    return new ShardedKeyCoder<>(keyCoder);
-  }
-
-  private final Coder<KeyT> keyCoder;
-  private final VarIntCoder shardNumberCoder;
-
-  protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
-    this.keyCoder = keyCoder;
-    this.shardNumberCoder = VarIntCoder.of();
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(keyCoder);
-  }
-
-  @Override
-  public void encode(ShardedKey<KeyT> key, OutputStream outStream)
-      throws IOException {
-    keyCoder.encode(key.getKey(), outStream);
-    shardNumberCoder.encode(key.getShardNumber(), outStream);
-  }
-
-  @Override
-  public ShardedKey<KeyT> decode(InputStream inStream)
-      throws IOException {
-    return new ShardedKey<>(
-        keyCoder.decode(inStream),
-        shardNumberCoder.decode(inStream));
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    keyCoder.verifyDeterministic();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 63e5bc1..a210858 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 18b2033..fa5b3ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index cd88222..51b9375 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ShardedKey;
 
 /**
  * Fn that tags each table row with a unique id and destination table. To avoid calling

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index d68779a..e1ed746 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
index 45dc2a8..887cb93 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index acd1132..451d1bd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c5494d8..9ed2916 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index bfd260a..d31f3a0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -131,6 +132,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 7255a94..442fba5 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -521,7 +522,8 @@ public class XmlIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
+      return input.apply(
+          org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity()));
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 6ae83f2..74e0bda 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -25,6 +25,7 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -34,18 +35,18 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T> {
+class XmlSink<T> extends FileBasedSink<T, Void> {
   private static final String XML_EXTENSION = ".xml";
 
   private final XmlIO.Write<T> spec;
 
-  private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
-    return DefaultFilenamePolicy.constructUsingStandardParameters(
+  private static <T> DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<T> spec) {
+    return DefaultFilenamePolicy.fromStandardParameters(
         spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false);
   }
 
   XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), makeFilenamePolicy(spec));
+    super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec)));
     this.spec = spec;
   }
 
@@ -75,10 +76,8 @@ class XmlSink<T> extends FileBasedSink<T> {
     super.populateDisplayData(builder);
   }
 
-  /**
-   * {@link WriteOperation} for XML {@link FileBasedSink}s.
-   */
-  protected static final class XmlWriteOperation<T> extends WriteOperation<T> {
+  /** {@link WriteOperation} for XML {@link FileBasedSink}s. */
+  protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> {
     public XmlWriteOperation(XmlSink<T> sink) {
       super(sink);
     }
@@ -112,10 +111,8 @@ class XmlSink<T> extends FileBasedSink<T> {
     }
   }
 
-  /**
-   * A {@link Writer} that can write objects as XML elements.
-   */
-  protected static final class XmlWriter<T> extends Writer<T> {
+  /** A {@link Writer} that can write objects as XML elements. */
+  protected static final class XmlWriter<T> extends Writer<T, Void> {
     final Marshaller marshaller;
     private OutputStream os = null;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index aa0c1c3..d1584dc 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -197,8 +197,8 @@ public class XmlSinkTest {
         .withRecordClass(Integer.class);
 
     DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml"));
+    assertThat(
+        displayData, hasDisplayItem("filenamePattern", "/path/to/file-SSSSS-of-NNNNN" + ".xml"));
     assertThat(displayData, hasDisplayItem("rootElement", "bird"));
     assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
   }