You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/29 00:10:14 UTC

[1/2] incubator-beam git commit: Write: add support for setting a fixed number of shards

Repository: incubator-beam
Updated Branches:
  refs/heads/master d5d303566 -> daafc86f3


Write: add support for setting a fixed number of shards

And remove special support in Dataflow and Direct runners for it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0070d2d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0070d2d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0070d2d4

Branch: refs/heads/master
Commit: 0070d2d486dbfa24ec515595b3e6e0ba6beecbbe
Parents: d5d3035
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 14 14:03:41 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 28 17:09:47 2016 -0700

----------------------------------------------------------------------
 .../direct/AvroIOShardedWriteFactory.java       |  76 -----
 .../beam/runners/direct/DirectRunner.java       |   4 -
 .../runners/direct/ShardControlledWrite.java    |  81 -----
 .../direct/TextIOShardedWriteFactory.java       |  78 -----
 .../direct/AvroIOShardedWriteFactoryTest.java   | 120 -------
 .../direct/TextIOShardedWriteFactoryTest.java   | 120 -------
 .../beam/runners/dataflow/DataflowRunner.java   | 258 ---------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  12 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  15 +-
 .../main/java/org/apache/beam/sdk/io/Write.java | 314 ++++++++++++++-----
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 145 ++++++++-
 11 files changed, 391 insertions(+), 832 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
deleted file mode 100644
index 7422f27..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class AvroIOShardedWriteFactory implements PTransformOverrideFactory {
-  @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof AvroIO.Write.Bound) {
-      @SuppressWarnings("unchecked")
-      AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform;
-      if (originalWrite.getNumShards() > 1
-          || (originalWrite.getNumShards() == 1
-              && !"".equals(originalWrite.getShardNameTemplate()))) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite);
-        return override;
-      }
-    }
-    return transform;
-  }
-
-  private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
-    private final AvroIO.Write.Bound<InputT> initial;
-
-    private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) {
-      this.initial = initial;
-    }
-
-    @Override
-    int getNumShards() {
-      return initial.getNumShards();
-    }
-
-    @Override
-    PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
-      String shardName =
-          IOChannelUtils.constructName(
-              initial.getFilenamePrefix(),
-              initial.getShardNameTemplate(),
-              initial.getFilenameSuffix(),
-              shardNum,
-              getNumShards());
-      return initial.withoutSharding().to(shardName).withSuffix("");
-    }
-
-    @Override
-    protected PTransform<PCollection<InputT>, PDone> delegate() {
-      return initial;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2584739..7408c0b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -24,8 +24,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
 import org.apache.beam.sdk.runners.AggregatorRetrievalException;
@@ -80,8 +78,6 @@ public class DirectRunner
           ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
               .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
               .put(CreatePCollectionView.class, new ViewOverrideFactory())
-              .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
-              .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
               .build();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
deleted file mode 100644
index 4687f85..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Partition;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PDone;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * A write that explicitly controls its number of output shards.
- */
-abstract class ShardControlledWrite<InputT>
-    extends ForwardingPTransform<PCollection<InputT>, PDone> {
-  @Override
-  public PDone apply(PCollection<InputT> input) {
-    int numShards = getNumShards();
-    checkArgument(
-        numShards >= 1,
-        "%s should only be applied if the output has a controlled number of shards (> 1); got %s",
-        getClass().getSimpleName(),
-        getNumShards());
-    PCollectionList<InputT> shards =
-        input.apply(
-            "PartitionInto" + numShards + "Shards",
-            Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>()));
-    for (int i = 0; i < shards.size(); i++) {
-      PCollection<InputT> shard = shards.get(i);
-      PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i);
-      shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard);
-    }
-    return PDone.in(input.getPipeline());
-  }
-
-  /**
-   * Returns the number of shards this {@link PTransform} should write to.
-   */
-  abstract int getNumShards();
-
-  /**
-   * Returns a {@link PTransform} that performs a write to the shard with the specified shard
-   * number.
-   *
-   * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for
-   * shard numbers {@code [0...n)}.
-   */
-  abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum);
-
-  private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> {
-    int nextPartition = -1;
-    @Override
-    public int partitionFor(T elem, int numPartitions) {
-      if (nextPartition < 0) {
-        nextPartition = ThreadLocalRandom.current().nextInt(numPartitions);
-      }
-      nextPartition++;
-      nextPartition %= numPartitions;
-      return nextPartition;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
deleted file mode 100644
index be1bf18..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Write.Bound;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class TextIOShardedWriteFactory implements PTransformOverrideFactory {
-
-  @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof TextIO.Write.Bound) {
-      @SuppressWarnings("unchecked")
-      TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform;
-      if (originalWrite.getNumShards() > 1
-          || (originalWrite.getNumShards() == 1
-              && !"".equals(originalWrite.getShardNameTemplate()))) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite);
-        return override;
-      }
-    }
-    return transform;
-  }
-
-  private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
-    private final TextIO.Write.Bound<InputT> initial;
-
-    private TextIOShardedWrite(Bound<InputT> initial) {
-      this.initial = initial;
-    }
-
-    @Override
-    int getNumShards() {
-      return initial.getNumShards();
-    }
-
-    @Override
-    PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
-      String shardName =
-          IOChannelUtils.constructName(
-              initial.getFilenamePrefix(),
-              initial.getShardTemplate(),
-              initial.getFilenameSuffix(),
-              shardNum,
-              getNumShards());
-      return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding();
-    }
-
-    @Override
-    protected PTransform<PCollection<InputT>, PDone> delegate() {
-      return initial;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
deleted file mode 100644
index d94113a..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroIOTest;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link AvroIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class AvroIOShardedWriteFactoryTest {
-
-  @Rule public TemporaryFolder tmp = new TemporaryFolder();
-  private AvroIOShardedWriteFactory factory;
-
-  @Before
-  public void setup() {
-    factory = new AvroIOShardedWriteFactory();
-  }
-
-  @Test
-  public void originalWithoutShardingReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding();
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath());
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    AvroIO.Write.Bound<String> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1);
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
-    Pipeline p = getPipeline();
-    String[] elems = new String[] {"foo", "bar", "baz"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-
-    p.run();
-    AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate());
-  }
-
-  @Test
-  public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    AvroIO.Write.Bound<String> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3);
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
-    Pipeline p = getPipeline();
-    String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-    p.run();
-    AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
-  }
-
-  private Pipeline getPipeline() {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setRunner(DirectRunner.class);
-    return TestPipeline.fromOptions(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
deleted file mode 100644
index 5ede931..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIOTest;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link TextIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class TextIOShardedWriteFactoryTest {
-  @Rule public TemporaryFolder tmp = new TemporaryFolder();
-  private TextIOShardedWriteFactory factory;
-
-  @Before
-  public void setup() {
-    factory = new TextIOShardedWriteFactory();
-  }
-
-  @Test
-  public void originalWithoutShardingReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original =
-        TextIO.Write.to(file.getAbsolutePath()).withoutSharding();
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original = TextIO.Write.to(file.getAbsolutePath());
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    TextIO.Write.Bound<String> original =
-        TextIO.Write.to(file.getAbsolutePath()).withNumShards(1);
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
-    Pipeline p = getPipeline();
-    String[] elems = new String[] {"foo", "bar", "baz"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-
-    p.run();
-    TextIOTest.assertOutputFiles(
-        elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate());
-  }
-
-  @Test
-  public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    TextIO.Write.Bound<String> original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3);
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
-    Pipeline p = getPipeline();
-    String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-    p.run();
-    TextIOTest.assertOutputFiles(
-        elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate());
-  }
-
-  private Pipeline getPipeline() {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setRunner(DirectRunner.class);
-    return TestPipeline.fromOptions(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 33f97e6..70dd94f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -66,7 +65,6 @@ import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
@@ -91,7 +89,6 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -376,8 +373,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       builder.put(Read.Unbounded.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
       builder.put(Write.Bound.class, BatchWrite.class);
-      builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
-      builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
       // In batch mode must use the custom Pubsub bounded source/sink.
       builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
       builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
@@ -2048,52 +2043,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * A {@link PTransform} that uses shuffle to create a fusion break. This allows pushing
-   * parallelism limits such as sharding controls further down the pipeline.
-   */
-  private static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      return input
-          // TODO: This would need to be adapted to write per-window shards.
-          .apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes())
-          .apply(
-              "RandomKey",
-              ParDo.of(
-                  new DoFn<T, KV<Long, T>>() {
-                    transient long counter, step;
-
-                    @Override
-                    public void startBundle(Context c) {
-                      counter = (long) (Math.random() * Long.MAX_VALUE);
-                      step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
-                    }
-
-                    @Override
-                    public void processElement(ProcessContext c) {
-                      counter += step;
-                      c.output(KV.of(counter, c.element()));
-                    }
-                  }))
-          .apply(GroupByKey.<Long, T>create())
-          .apply(
-              "Ungroup",
-              ParDo.of(
-                  new DoFn<KV<Long, Iterable<T>>, T>() {
-                    @Override
-                    public void processElement(ProcessContext c) {
-                      for (T item : c.element().getValue()) {
-                        c.output(item);
-                      }
-                    }
-                  }));
-    }
-  }
-
-  /**
    * Specialized implementation which overrides
    * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google
    * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
@@ -2122,213 +2071,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Specialized implementation which overrides
-   * {@link org.apache.beam.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with
-   * a native sink instead of a custom sink as workaround until custom sinks
-   * have support for sharding controls.
-   */
-  private static class BatchTextIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final TextIO.Write.Bound<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchTextIOWrite(DataflowRunner runner, TextIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      if (transform.getNumShards() > 0) {
-        return input
-            .apply(new ReshardForWrite<T>())
-            .apply(new BatchTextIONativeWrite<>(transform));
-      } else {
-        return transform.apply(input);
-      }
-    }
-  }
-
-  /**
-   * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
-   * to provide the native definition of the Text sink.
-   */
-  private static class BatchTextIONativeWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final TextIO.Write.Bound<T> transform;
-    public BatchTextIONativeWrite(TextIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      return PDone.in(input.getPipeline());
-    }
-
-    static {
-      DataflowPipelineTranslator.registerTransformTranslator(
-          BatchTextIONativeWrite.class, new BatchTextIONativeWriteTranslator());
-    }
-  }
-
-  /**
-   * TextIO.Write.Bound support code for the Dataflow backend when applying parallelism limits
-   * through user requested sharding limits.
-   */
-  private static class BatchTextIONativeWriteTranslator
-      implements TransformTranslator<BatchTextIONativeWrite<?>> {
-    @SuppressWarnings("unchecked")
-    @Override
-    public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, transform.transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        BatchTextIONativeWrite<T> transform,
-        TextIO.Write.Bound<T> originalTransform,
-        TranslationContext context) {
-      // Note that the original transform can not be used during add step/add input
-      // and is only passed in to get properties from it.
-
-      checkState(originalTransform.getNumShards() > 0,
-          "Native TextSink is expected to only be used when sharding controls are required.");
-
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-
-      // TODO: drop this check when server supports alternative templates.
-      switch (originalTransform.getShardTemplate()) {
-        case ShardNameTemplate.INDEX_OF_MAX:
-          break;  // supported by server
-        case "":
-          // Empty shard template allowed - forces single output.
-          checkArgument(originalTransform.getNumShards() <= 1,
-              "Num shards must be <= 1 when using an empty sharding template");
-          break;
-        default:
-          throw new UnsupportedOperationException("Shard template "
-              + originalTransform.getShardTemplate()
-              + " not yet supported by Dataflow service");
-      }
-
-      // TODO: How do we want to specify format and
-      // format-specific properties?
-      context.addInput(PropertyNames.FORMAT, "text");
-      context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix());
-      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE,
-          originalTransform.getShardNameTemplate());
-      context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix());
-      context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation());
-      context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards());
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(originalTransform.getCoder()));
-
-    }
-  }
-
-  /**
-   * Specialized implementation which overrides
-   * {@link org.apache.beam.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with
-   * a native sink instead of a custom sink as workaround until custom sinks
-   * have support for sharding controls.
-   */
-  private static class BatchAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final AvroIO.Write.Bound<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchAvroIOWrite(DataflowRunner runner, AvroIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      if (transform.getNumShards() > 0) {
-        return input
-            .apply(new ReshardForWrite<T>())
-            .apply(new BatchAvroIONativeWrite<>(transform));
-      } else {
-        return transform.apply(input);
-      }
-    }
-  }
-
-  /**
-   * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
-   * to provide the native definition of the Avro sink.
-   */
-  private static class BatchAvroIONativeWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final AvroIO.Write.Bound<T> transform;
-    public BatchAvroIONativeWrite(AvroIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      return PDone.in(input.getPipeline());
-    }
-
-    static {
-      DataflowPipelineTranslator.registerTransformTranslator(
-          BatchAvroIONativeWrite.class, new BatchAvroIONativeWriteTranslator());
-    }
-  }
-
-  /**
-   * AvroIO.Write.Bound support code for the Dataflow backend when applying parallelism limits
-   * through user requested sharding limits.
-   */
-  private static class BatchAvroIONativeWriteTranslator
-      implements TransformTranslator<BatchAvroIONativeWrite<?>> {
-    @SuppressWarnings("unchecked")
-    @Override
-    public void translate(@SuppressWarnings("rawtypes") BatchAvroIONativeWrite transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, transform.transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        BatchAvroIONativeWrite<T> transform,
-        AvroIO.Write.Bound<T> originalTransform,
-        TranslationContext context) {
-      // Note that the original transform can not be used during add step/add input
-      // and is only passed in to get properties from it.
-
-      checkState(originalTransform.getNumShards() > 0,
-          "Native AvroSink is expected to only be used when sharding controls are required.");
-
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-
-      // TODO: drop this check when server supports alternative templates.
-      switch (originalTransform.getShardTemplate()) {
-        case ShardNameTemplate.INDEX_OF_MAX:
-          break;  // supported by server
-        case "":
-          // Empty shard template allowed - forces single output.
-          checkArgument(originalTransform.getNumShards() <= 1,
-              "Num shards must be <= 1 when using an empty sharding template");
-          break;
-        default:
-          throw new UnsupportedOperationException("Shard template "
-              + originalTransform.getShardTemplate()
-              + " not yet supported by Dataflow service");
-      }
-
-      context.addInput(PropertyNames.FORMAT, "avro");
-      context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix());
-      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate());
-      context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix());
-      context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation());
-      context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards());
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(
-              AvroCoder.of(originalTransform.getType(), originalTransform.getSchema())));
-    }
-  }
-
-  /**
    * Specialized (non-)implementation for
    * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound}
    * for the Dataflow runner in streaming mode.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 280cd12..718461a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -647,14 +647,14 @@ public class AvroIO {
           throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
         }
 
-        // Note that custom sinks currently do not expose sharding controls.
-        // Thus pipeline runner writers need to individually add support internally to
-        // apply user requested sharding limits.
-        return input.apply(
-            "Write",
+        org.apache.beam.sdk.io.Write.Bound<T> write =
             org.apache.beam.sdk.io.Write.to(
                 new AvroSink<>(
-                    filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))));
+                    filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)));
+        if (getNumShards() > 0) {
+          write = write.withNumShards(getNumShards());
+        }
+        return input.apply("Write", write);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9dd3679..64db3f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -109,7 +109,7 @@ import javax.annotation.Nullable;
  * }</pre>
  *
  * <h3>Permissions</h3>
- * <p>When run using the {@link DirectRunner}, your pipeline can read and write text files
+ * <p>When run using the {@code DirectRunner}, your pipeline can read and write text files
  * on your local drive and remote text files on Google Cloud Storage that you have access to using
  * your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only
  * read and write files from GCS. For more information about permissions, see the Cloud Dataflow
@@ -608,12 +608,13 @@ public class TextIO {
               "need to set the filename prefix of a TextIO.Write transform");
         }
 
-        // Note that custom sinks currently do not expose sharding controls.
-        // Thus pipeline runner writers need to individually add support internally to
-        // apply user requested sharding limits.
-        return input.apply("Write", org.apache.beam.sdk.io.Write.to(
-            new TextSink<>(
-                filenamePrefix, filenameSuffix, shardTemplate, coder)));
+        org.apache.beam.sdk.io.Write.Bound<T> write =
+            org.apache.beam.sdk.io.Write.to(
+                new TextSink<>(filenamePrefix, filenameSuffix, shardTemplate, coder));
+        if (getNumShards() > 0) {
+          write = write.withNumShards(getNumShards());
+        }
+        return input.apply("Write", write);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index df6e4d2..c48933b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -26,54 +28,80 @@ import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
+import com.google.api.client.util.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
  * initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}.  In the case of an empty PCollection, only
- * the global initialization and finalization will be performed.
+ * of the write. The output of a write is {@link PDone}.
+ *
+ * <p>By default, every bundle in the input {@link PCollection} will be processed by a
+ * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
+ * least 1 output will always be produced. The exact parallelism of the write stage can be
+ * controlled using {@link Write.Bound#withNumShards}, typically used to control how many files are
+ * produced or to globally limit the number of workers connecting to an external service. However,
+ * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
+ *
+ * <p>{@code Write} re-windows the data into the global window, so it is typically not well suited
+ * to use in streaming pipelines.
  *
- * <p>Currently, only batch workflows can contain Write transforms.
+ * <p>Example usage with runner-controlled sharding:
  *
- * <p>Example usage:
+ * <pre>{@code p.apply(Write.to(new MySink(...)));}</pre>
+
+ * <p>Example usage with a fixed number of shards:
  *
- * <p>{@code p.apply(Write.to(new MySink(...)));}
+ * <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class Write {
   private static final Logger LOG = LoggerFactory.getLogger(Write.class);
 
   /**
-   * Creates a Write transform that writes to the given Sink.
+   * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
+   * control how many different shards are produced.
    */
   public static <T> Bound<T> to(Sink<T> sink) {
-    return new Bound<>(sink);
+    checkNotNull(sink, "sink");
+    return new Bound<>(sink, 0 /* runner-controlled sharding */);
   }
 
   /**
-   * A {@link PTransform} that writes to a {@link Sink}. See {@link Write} and {@link Sink} for
-   * documentation about writing to Sinks.
+   * A {@link PTransform} that writes to a {@link Sink}. See the class-level Javadoc for more
+   * information.
+   *
+   * @see Write
+   * @see Sink
    */
   public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
     private final Sink<T> sink;
+    private int numShards;
 
-    private Bound(Sink<T> sink) {
+    private Bound(Sink<T> sink, int numShards) {
       this.sink = sink;
+      this.numShards = numShards;
     }
 
     @Override
@@ -87,9 +115,20 @@ public class Write {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .add(DisplayData.item("sink", sink.getClass())
-            .withLabel("Write Sink"))
-          .include(sink);
+          .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+          .include(sink)
+          .addIfNotDefault(
+              DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"),
+              0);
+    }
+
+    /**
+     * Returns the number of shards that will be produced in the output.
+     *
+     * @see Write for more information
+     */
+    public int getNumShards() {
+      return numShards;
     }
 
     /**
@@ -100,6 +139,153 @@ public class Write {
     }
 
     /**
+     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
+     * specified number of shards.
+     *
+     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+     * more information.
+     *
+     * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+     * runner-controlled sharding.
+     */
+    public Bound<T> withNumShards(int numShards) {
+      return new Bound<>(sink, Math.max(numShards, 0));
+    }
+
+    /**
+     * Writes all the elements in a bundle using a {@link Writer} produced by the
+     * {@link WriteOperation} associated with the {@link Sink}.
+     */
+    private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+      // Writer that will write the records in this bundle. Lazily
+      // initialized in processElement.
+      private Writer<T, WriteT> writer = null;
+      private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+      WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+        this.writeOperationView = writeOperationView;
+      }
+
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        // Lazily initialize the Writer
+        if (writer == null) {
+          WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+          LOG.info("Opening writer for write operation {}", writeOperation);
+          writer = writeOperation.createWriter(c.getPipelineOptions());
+          writer.open(UUID.randomUUID().toString());
+          LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+        }
+        try {
+          writer.write(c.element());
+        } catch (Exception e) {
+          // Discard write result and close the write.
+          try {
+            writer.close();
+            // The writer does not need to be reset, as this DoFn cannot be reused.
+          } catch (Exception closeException) {
+            if (closeException instanceof InterruptedException) {
+              // Do not silently ignore interrupted state.
+              Thread.currentThread().interrupt();
+            }
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
+          }
+          throw e;
+        }
+      }
+
+      @Override
+      public void finishBundle(Context c) throws Exception {
+        if (writer != null) {
+          WriteT result = writer.close();
+          c.output(result);
+          // Reset state in case of reuse.
+          writer = null;
+        }
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Write.Bound.this.populateDisplayData(builder);
+      }
+    }
+
+    /**
+     * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+     * a single iterable.
+     *
+     * @see WriteBundles
+     */
+    private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+      private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+      WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+        this.writeOperationView = writeOperationView;
+      }
+
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        // In a sharded write, single input element represents one shard. We can open and close
+        // the writer in each call to processElement.
+        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+        LOG.info("Opening writer for write operation {}", writeOperation);
+        Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+        writer.open(UUID.randomUUID().toString());
+        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+        try {
+          for (T t : c.element().getValue()) {
+            writer.write(t);
+          }
+        } catch (Exception e) {
+          try {
+            writer.close();
+          } catch (Exception closeException) {
+            if (closeException instanceof InterruptedException) {
+              // Do not silently ignore interrupted state.
+              Thread.currentThread().interrupt();
+            }
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
+          }
+          throw e;
+        }
+
+        // Close the writer; if this throws let the error propagate.
+        WriteT result = writer.close();
+        c.output(result);
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Write.Bound.this.populateDisplayData(builder);
+      }
+    }
+
+    private static class ApplyShardingKey<T> implements SerializableFunction<T, Integer> {
+      private final int numShards;
+      private int shardNumber;
+
+      ApplyShardingKey(int numShards) {
+        this.numShards = numShards;
+        shardNumber = -1;
+      }
+
+      @Override
+      public Integer apply(T input) {
+        if (shardNumber == -1) {
+          // We want to desynchronize the first record sharding key for each instance of
+          // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+          shardNumber = ThreadLocalRandom.current().nextInt(numShards);
+        } else {
+          shardNumber = (shardNumber + 1) % numShards;
+        }
+        return shardNumber;
+      }
+    }
+
+    /**
      * A write is performed as sequence of three {@link ParDo}'s.
      *
      * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
@@ -142,7 +328,7 @@ public class Write {
       // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
       // the sink.
       PCollection<WriteOperation<T, WriteT>> operationCollection =
-          p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));
+          p.apply(Create.of(writeOperation).withCoder(operationCoder));
 
       // Initialize the resource in a do-once ParDo on the WriteOperation.
       operationCollection = operationCollection
@@ -165,57 +351,32 @@ public class Write {
       final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
           operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
 
+      // Re-window the data into the global window and remove any existing triggers.
+      PCollection<T> inputInGlobalWindow =
+          input.apply(
+              Window.<T>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+
       // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
       // as a side input) and collect the results of the writes in a PCollection.
       // There is a dependency between this ParDo and the first (the WriteOperation PCollection
       // as a side input), so this will happen after the initial ParDo.
-      PCollection<WriteT> results = input
-          .apply(Window.<T>into(new GlobalWindows()))
-          .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
-            // Writer that will write the records in this bundle. Lazily
-            // initialized in processElement.
-            private Writer<T, WriteT> writer = null;
-
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              // Lazily initialize the Writer
-              if (writer == null) {
-                WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-                LOG.info("Opening writer for write operation {}", writeOperation);
-                writer = writeOperation.createWriter(c.getPipelineOptions());
-                writer.open(UUID.randomUUID().toString());
-                LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-              }
-              try {
-                writer.write(c.element());
-              } catch (Exception e) {
-                // Discard write result and close the write.
-                try {
-                  writer.close();
-                  // The writer does not need to be reset, as this DoFn cannot be reused
-                } catch (Exception closeException) {
-                  // Do not mask the exception that caused the write to fail.
-                }
-                throw e;
-              }
-            }
-
-            @Override
-            public void finishBundle(Context c) throws Exception {
-              if (writer != null) {
-                WriteT result = writer.close();
-                c.output(result);
-                // Reset state in case of reuse
-                writer = null;
-              }
-            }
-
-            @Override
-            public void populateDisplayData(DisplayData.Builder builder) {
-              Write.Bound.this.populateDisplayData(builder);
-            }
-          }).withSideInputs(writeOperationView))
-          .setCoder(writeOperation.getWriterResultCoder());
+      PCollection<WriteT> results;
+      if (getNumShards() <= 0) {
+        results = inputInGlobalWindow
+            .apply("WriteBundles",
+                ParDo.of(new WriteBundles<>(writeOperationView))
+                    .withSideInputs(writeOperationView));
+      } else {
+        results = inputInGlobalWindow
+            .apply("ApplyShardLabel", WithKeys.of(new ApplyShardingKey<T>(getNumShards())))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView))
+                    .withSideInputs(writeOperationView));
+      }
+      results.setCoder(writeOperation.getWriterResultCoder());
 
       final PCollectionView<Iterable<WriteT>> resultsView =
           results.apply(View.<WriteT>asIterable());
@@ -231,17 +392,26 @@ public class Write {
             @Override
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}", writeOperation);
-              Iterable<WriteT> results = c.sideInput(resultsView);
-              LOG.debug("Side input initialized to finalize write operation {}", writeOperation);
-              if (!results.iterator().hasNext()) {
-                LOG.info("No write results, creating a single empty output.");
-                Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                writer.open(UUID.randomUUID().toString());
-                WriteT emptyWrite = writer.close();
-                results = Collections.singleton(emptyWrite);
-                LOG.debug("Done creating a single empty output.");
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+              // We must always output at least 1 shard, and honor user-specified numShards if set.
+              int minShardsNeeded = Math.max(1, getNumShards());
+              int extraShardsNeeded = minShardsNeeded - results.size();
+              if (extraShardsNeeded > 0) {
+                LOG.info(
+                    "Creating {} empty output shards in addition to {} written for a total of {}.",
+                    extraShardsNeeded, results.size(), minShardsNeeded);
+                for (int i = 0; i < extraShardsNeeded; ++i) {
+                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                  writer.open(UUID.randomUUID().toString());
+                  WriteT emptyWrite = writer.close();
+                  results.add(emptyWrite);
+                }
+                LOG.debug("Done creating extra shards.");
               }
+
               writeOperation.finalize(results, c.getPipelineOptions());
               LOG.debug("Done finalizing write operation {}", writeOperation);
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index abda3a5..56643f2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,9 +19,11 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -52,7 +54,9 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
 
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -62,12 +66,14 @@ import org.junit.runners.JUnit4;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests for the Write PTransform.
@@ -76,6 +82,10 @@ import java.util.concurrent.ThreadLocalRandom;
 public class WriteTest {
   // Static store that can be accessed within the writer
   private static List<String> sinkContents = new ArrayList<>();
+  // Static count of output shards
+  private static AtomicInteger numShards = new AtomicInteger(0);
+  // Static counts of the number of records per shard.
+  private static List<Integer> recordsPerShard = new ArrayList<>();
 
   private static final MapElements<String, String> IDENTITY_MAP =
       MapElements.via(new SimpleFunction<String, String>() {
@@ -129,6 +139,71 @@ public class WriteTest {
   }
 
   /**
+   * Test that Write with an empty input still produces one shard.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testEmptyWrite() {
+    runWrite(Collections.<String>emptyList(), IDENTITY_MAP);
+    // Note we did not request a sharded write, so runWrite will not validate the number of shards.
+    assertEquals(1, numShards.intValue());
+  }
+
+  /**
+   * Test that Write with a configured number of shards produces the desired number of shards even
+   * when there are many elements.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testShardedWrite() {
+    runShardedWrite(
+        Arrays.asList("one", "two", "three", "four", "five", "six"),
+        IDENTITY_MAP,
+        Optional.of(1));
+  }
+
+  /**
+   * Test that Write with a configured number of shards produces the desired number of shards even
+   * when there are too few elements.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExpandShardedWrite() {
+    runShardedWrite(
+        Arrays.asList("one", "two", "three", "four", "five", "six"),
+        IDENTITY_MAP,
+        Optional.of(20));
+  }
+
+  /**
+   * Tests that a Write can balance many elements.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testShardedWriteBalanced() {
+    int numElements = 1000;
+    List<String> inputs = new ArrayList<>(numElements);
+    for (int i = 0; i < numElements; ++i) {
+      inputs.add(String.format("elt%04d", i));
+    }
+
+    runShardedWrite(
+        inputs,
+        new WindowAndReshuffle<>(
+            Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
+        Optional.of(10));
+
+    // Check that both the min and max number of results per shard are close to the expected.
+    int min = Integer.MAX_VALUE;
+    int max = Integer.MIN_VALUE;
+    for (Integer i : recordsPerShard) {
+      min = Math.min(min, i);
+      max = Math.max(max, i);
+    }
+    assertThat((double) min, Matchers.greaterThanOrEqualTo(max * 0.9));
+  }
+
+  /**
    * Test a Write transform with an empty PCollection.
    */
   @Test
@@ -147,7 +222,7 @@ public class WriteTest {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
     runWrite(
-        inputs, new WindowAndReshuffle(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
+        inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
   }
 
   /**
@@ -161,7 +236,22 @@ public class WriteTest {
 
     runWrite(
         inputs,
-        new WindowAndReshuffle(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
+        new WindowAndReshuffle<>(
+            Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
+  }
+
+  @Test
+  public void testBuildWrite() {
+    Sink<String> sink = new TestSink() {};
+    Write.Bound<String> write = Write.to(sink).withNumShards(3);
+    assertEquals(3, write.getNumShards());
+    assertThat(write.getSink(), is(sink));
+
+    Write.Bound<String> write2 = write.withNumShards(7);
+    assertEquals(7, write2.getNumShards());
+    assertThat(write2.getSink(), is(sink));
+    // original unchanged
+    assertEquals(3, write.getNumShards());
   }
 
   @Test
@@ -179,7 +269,20 @@ public class WriteTest {
     assertThat(displayData, includesDisplayDataFrom(sink));
   }
 
-
+  @Test
+  public void testShardedDisplayData() {
+    TestSink sink = new TestSink() {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    Write.Bound<String> write = Write.to(sink).withNumShards(1);
+    DisplayData displayData = DisplayData.from(write);
+    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
+    assertThat(displayData, includesDisplayDataFrom(sink));
+    assertThat(displayData, hasDisplayItem("numShards", 1));
+  }
 
   /**
    * Performs a Write transform and verifies the Write transform calls the appropriate methods on
@@ -188,6 +291,18 @@ public class WriteTest {
    */
   private static void runWrite(
       List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
+    runShardedWrite(inputs, transform, Optional.<Integer>absent());
+  }
+
+  /**
+   * Performs a Write transform with the desired number of shards. Verifies the Write transform
+   * calls the appropriate methods on a test sink in the correct order, as well as verifies that
+   * the elements of a PCollection are written to the sink. If numConfiguredShards is not null, also
+   * verifies that the output number of shards is correct.
+   */
+  private static void runShardedWrite(
+      List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
+      Optional<Integer> numConfiguredShards) {
     // Flag to validate that the pipeline options are passed to the Sink
     WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
     options.setTestFlag("test_value");
@@ -195,6 +310,10 @@ public class WriteTest {
 
     // Clear the sink's contents.
     sinkContents.clear();
+    // Reset the number of shards produced.
+    numShards.set(0);
+    // Reset the number of records in each shard.
+    recordsPerShard.clear();
 
     // Prepare timestamps for the elements.
     List<Long> timestamps = new ArrayList<>();
@@ -203,13 +322,21 @@ public class WriteTest {
     }
 
     TestSink sink = new TestSink();
+    Write.Bound<String> write = Write.to(sink);
+    if (numConfiguredShards.isPresent()) {
+      write = write.withNumShards(numConfiguredShards.get());
+    }
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
      .apply(transform)
-     .apply(Write.to(sink));
+     .apply(write);
 
     p.run();
     assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));
     assertTrue(sink.hasCorrectState());
+    if (numConfiguredShards.isPresent()) {
+      assertEquals(numConfiguredShards.get().intValue(), numShards.intValue());
+      assertEquals(numConfiguredShards.get().intValue(), recordsPerShard.size());
+    }
   }
 
   // Test sink and associated write operation and writer. TestSink, TestWriteOperation, and
@@ -246,10 +373,7 @@ public class WriteTest {
      */
     @Override
     public boolean equals(Object other) {
-      if (!(other instanceof TestSink)) {
-        return false;
-      }
-      return true;
+      return (other instanceof TestSink);
     }
 
     @Override
@@ -314,6 +438,7 @@ public class WriteTest {
         idSet.add(result.uId);
         // Add the elements that were written to the sink's contents.
         sinkContents.addAll(result.elementsWritten);
+        recordsPerShard.add(result.elementsWritten.size());
       }
       // Each result came from a unique id.
       assertEquals(resultCount, idSet.size());
@@ -398,6 +523,7 @@ public class WriteTest {
 
     @Override
     public void open(String uId) throws Exception {
+      numShards.incrementAndGet();
       this.uId = uId;
       assertEquals(State.INITIAL, state);
       state = State.OPENED;
@@ -421,10 +547,9 @@ public class WriteTest {
   /**
    * Options for test, exposed for PipelineOptionsFactory.
    */
-  public static interface WriteOptions extends TestPipelineOptions {
+  public interface WriteOptions extends TestPipelineOptions {
     @Description("Test flag and value")
     String getTestFlag();
-
     void setTestFlag(String value);
   }
 }


[2/2] incubator-beam git commit: Closes #534

Posted by dh...@apache.org.
Closes #534


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/daafc86f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/daafc86f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/daafc86f

Branch: refs/heads/master
Commit: daafc86f3827db289589087e550ed9827e6f8f4b
Parents: d5d3035 0070d2d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 28 17:09:48 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 28 17:09:48 2016 -0700

----------------------------------------------------------------------
 .../direct/AvroIOShardedWriteFactory.java       |  76 -----
 .../beam/runners/direct/DirectRunner.java       |   4 -
 .../runners/direct/ShardControlledWrite.java    |  81 -----
 .../direct/TextIOShardedWriteFactory.java       |  78 -----
 .../direct/AvroIOShardedWriteFactoryTest.java   | 120 -------
 .../direct/TextIOShardedWriteFactoryTest.java   | 120 -------
 .../beam/runners/dataflow/DataflowRunner.java   | 258 ---------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  12 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  15 +-
 .../main/java/org/apache/beam/sdk/io/Write.java | 314 ++++++++++++++-----
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 145 ++++++++-
 11 files changed, 391 insertions(+), 832 deletions(-)
----------------------------------------------------------------------