You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/01 04:10:48 UTC

[1/9] beam git commit: BEAM-1421 Latest should comply with PTransform style guide

Repository: beam
Updated Branches:
  refs/heads/master 63b63f0b5 -> d66029caf


BEAM-1421 Latest should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b9afe8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b9afe8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b9afe8b

Branch: refs/heads/master
Commit: 0b9afe8bf59245d437a33ee11fdcaa8e57ad4ffe
Parents: a7c60cc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:18:26 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:36 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Latest.java  | 80 ++++++++++++--------
 1 file changed, 48 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b9afe8b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 9c2d715..e6892c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Iterator;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -58,12 +59,43 @@ import org.apache.beam.sdk.values.TimestampedValue;
  * }
  * }</pre>
  *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
+ *
  * <p>For elements with the same timestamp, the element chosen for output is arbitrary.
  */
 public class Latest {
   // Do not instantiate
   private Latest() {}
 
+  /** Returns a {@link Combine.CombineFn} that selects the latest element among its inputs. */
+  public static <T> Combine.CombineFn<TimestampedValue<T>, ?, T> combineFn() {
+    return new LatestFn<>();
+  }
+
+  /**
+   * Returns a {@link PTransform} that takes as input a {@code PCollection<T>} and returns a
+   * {@code PCollection<T>} whose contents is the latest element according to its event time, or
+   * {@literal null} if there are no elements.
+   *
+   * @param <T> The type of the elements being combined.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> globally() {
+    return new Globally<>();
+  }
+
+  /**
+   * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a
+   * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its
+   * event time.
+   *
+   * @param <K> The key type of the elements being combined.
+   * @param <V> The value type of the elements being combined.
+   */
+  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() {
+    return new PerKey<>();
+  }
+
   /**
    * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is
    * particularly useful as an {@link Aggregator}.
@@ -71,7 +103,8 @@ public class Latest {
    * @param <T> Type of input element.
    * @see Latest
    */
-  public static class LatestFn<T>
+  @VisibleForTesting
+  static class LatestFn<T>
       extends Combine.CombineFn<TimestampedValue<T>, TimestampedValue<T>, T> {
     /** Construct a new {@link LatestFn} instance. */
     public LatestFn() {}
@@ -82,8 +115,8 @@ public class Latest {
     }
 
     @Override
-    public TimestampedValue<T> addInput(TimestampedValue<T> accumulator,
-        TimestampedValue<T> input) {
+    public TimestampedValue<T> addInput(
+        TimestampedValue<T> accumulator, TimestampedValue<T> input) {
       checkNotNull(accumulator, "accumulator must be non-null");
       checkNotNull(input, "input must be non-null");
 
@@ -95,16 +128,20 @@ public class Latest {
     }
 
     @Override
-    public Coder<TimestampedValue<T>> getAccumulatorCoder(CoderRegistry registry,
-        Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
+    public Coder<TimestampedValue<T>> getAccumulatorCoder(
+        CoderRegistry registry, Coder<TimestampedValue<T>> inputCoder)
+        throws CannotProvideCoderException {
       return NullableCoder.of(inputCoder);
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder(CoderRegistry registry,
-        Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
-      checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder,
-          "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder);
+    public Coder<T> getDefaultOutputCoder(
+        CoderRegistry registry, Coder<TimestampedValue<T>> inputCoder)
+        throws CannotProvideCoderException {
+      checkState(
+          inputCoder instanceof TimestampedValue.TimestampedValueCoder,
+          "inputCoder must be a TimestampedValueCoder, but was %s",
+          inputCoder);
 
       TimestampedValue.TimestampedValueCoder<T> inputTVCoder =
           (TimestampedValue.TimestampedValueCoder<T>) inputCoder;
@@ -134,29 +171,7 @@ public class Latest {
     }
   }
 
-  /**
-   * Returns a {@link PTransform} that takes as input a {@code PCollection<T>} and returns a
-   * {@code PCollection<T>} whose contents is the latest element according to its event time, or
-   * {@literal null} if there are no elements.
-   *
-   * @param <T> The type of the elements being combined.
-   */
-  public static <T> PTransform<PCollection<T>, PCollection<T>> globally() {
-    return new Globally<>();
-  }
-
-  /**
-   * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a
-   * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its
-   * event time.
-   *
-   * @param <K> The key type of the elements being combined.
-   * @param <V> The value type of the elements being combined.
-   */
-  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() {
-    return new PerKey<>();
-  }
-
+  /** Implementation of {@link #globally()}. */
   private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
     public PCollection<T> expand(PCollection<T> input) {
@@ -175,6 +190,7 @@ public class Latest {
     }
   }
 
+  /** Implementation of {@link #perKey()}. */
   private static class PerKey<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
     @Override


[7/9] beam git commit: BEAM-1417 Count should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1417 Count should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9957c895
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9957c895
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9957c895

Branch: refs/heads/master
Commit: 9957c895b1b1c3e491f288d17e70445c9864742a
Parents: f5056ef
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Feb 8 16:27:06 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    |  2 +-
 .../flink/examples/streaming/AutoComplete.java  |  2 +-
 .../translation/streaming/CreateStreamTest.java |  4 +++-
 .../org/apache/beam/sdk/transforms/Count.java   | 24 +++++++++++++-------
 .../apache/beam/sdk/testing/TestStreamTest.java |  4 +++-
 5 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c815f27..861a292 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -116,7 +116,7 @@ public class AutoComplete {
     public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
-        .apply(new Count.PerElement<String>())
+        .apply(Count.<String>perElement())
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index f33e616..d07df29 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -85,7 +85,7 @@ public class AutoComplete {
     public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
-        .apply(new Count.PerElement<String>())
+        .apply(Count.<String>perElement())
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index ff77535..b32f5f3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -114,7 +115,8 @@ public class CreateStreamTest implements Serializable {
         .apply(GroupByKey.<Integer, Integer>create())
         .apply(Values.<Iterable<Integer>>create())
         .apply(Flatten.<Integer>iterables());
-    PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+    PCollection<Long> count =
+        windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
     PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
 
     IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index d164978..fd91430 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -34,31 +34,39 @@ import org.apache.beam.sdk.values.PCollection;
 
 
 /**
- * {@code PTransorm}s to count the elements in a {@link PCollection}.
+ * {@link PTransform PTransforms} to count the elements in a {@link PCollection}.
  *
  * <p>{@link Count#perElement()} can be used to count the number of occurrences of each
  * distinct element in the PCollection, {@link Count#perKey()} can be used to count the
  * number of values per key, and {@link Count#globally()} can be used to count the total
  * number of elements in a PCollection.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
  */
 public class Count {
   private Count() {
     // do not instantiate
   }
 
+  /** Returns a {@link CombineFn} that counts the number of its inputs. */
+  public static <T> CombineFn<T, ?, Long> combineFn() {
+    return new CountFn<T>();
+  }
+
   /**
-   * Returns a {@link Combine.Globally} {@link PTransform} that counts the number of elements in
+   * Returns a {@link PTransform} that counts the number of elements in
    * its input {@link PCollection}.
    */
-  public static <T> Combine.Globally<T, Long> globally() {
+  public static <T> PTransform<PCollection<T>, PCollection<Long>> globally() {
     return Combine.globally(new CountFn<T>());
   }
 
   /**
-   * Returns a {@link Combine.PerKey} {@link PTransform} that counts the number of elements
+   * Returns a {@link PTransform} that counts the number of elements
    * associated with each key of its input {@link PCollection}.
    */
-  public static <K, V> Combine.PerKey<K, V, Long> perKey() {
+  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> perKey() {
     return Combine.<K, V, Long>perKey(new CountFn<V>());
   }
 
@@ -68,7 +76,7 @@ public class Count {
    *
    * <p>See {@link PerElement Count.PerElement} for more details.
    */
-  public static <T> PerElement<T> perElement() {
+  public static <T> PTransform<PCollection<T>, PCollection<KV<T, Long>>> perElement() {
     return new PerElement<>();
   }
 
@@ -97,10 +105,10 @@ public class Count {
    * @param <T> the type of the elements of the input {@code PCollection}, and the type of the keys
    * of the output {@code PCollection}
    */
-  public static class PerElement<T>
+  private static class PerElement<T>
       extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
 
-    public PerElement() { }
+    private PerElement() { }
 
     @Override
     public PCollection<KV<T, Long>> expand(PCollection<T> input) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 1514601..614831d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.TestStream.Builder;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -96,7 +97,8 @@ public class TestStreamTest implements Serializable {
         .apply(GroupByKey.<Integer, Integer>create())
         .apply(Values.<Iterable<Integer>>create())
         .apply(Flatten.<Integer>iterables());
-    PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+    PCollection<Long> count =
+        windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
     PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
 
     IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));


[6/9] beam git commit: BEAM-1426 SortValues should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1426 SortValues should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fffd2c55
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fffd2c55
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fffd2c55

Branch: refs/heads/master
Commit: fffd2c55895e4eb03ae0796d3ce9c4f75b9b34c3
Parents: 4f34caa
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:39:33 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../sorter/BufferedExternalSorter.java          | 23 ++++++----
 .../sorter/BufferedExternalSorterTest.java      | 46 ++++++++++----------
 .../sdk/extensions/sorter/SortValuesTest.java   |  2 +-
 3 files changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fffd2c55/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
index 1a16511..efa251a 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
@@ -29,19 +29,27 @@ import org.apache.beam.sdk.values.KV;
  * then fall back to external sorting.
  */
 public class BufferedExternalSorter implements Sorter {
+  public static Options options() {
+    return new Options("/tmp", 100);
+  }
+
   /** Contains configuration for the sorter. */
   public static class Options implements Serializable {
-    private String tempLocation = "/tmp";
-    private int memoryMB = 100;
+    private final String tempLocation;
+    private final int memoryMB;
+
+    private Options(String tempLocation, int memoryMB) {
+      this.tempLocation = tempLocation;
+      this.memoryMB = memoryMB;
+    }
 
     /** Sets the path to a temporary location where the sorter writes intermediate files. */
-    public Options setTempLocation(String tempLocation) {
+    public Options withTempLocation(String tempLocation) {
       checkArgument(
           !tempLocation.startsWith("gs://"),
           "BufferedExternalSorter does not support GCS temporary location");
 
-      this.tempLocation = tempLocation;
-      return this;
+      return new Options(tempLocation, memoryMB);
     }
 
     /** Returns the configured temporary location. */
@@ -54,13 +62,12 @@ public class BufferedExternalSorter implements Sorter {
      * memory sorting and the buffer used when external sorting. Must be greater than zero and less
      * than 2048.
      */
-    public Options setMemoryMB(int memoryMB) {
+    public Options withMemoryMB(int memoryMB) {
       checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
       // Hadoop's external sort stores the number of available memory bytes in an int, this prevents
       // overflow
       checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
-      this.memoryMB = memoryMB;
-      return this;
+      return new Options(tempLocation, memoryMB);
     }
 
     /** Returns the configured size of the memory buffer. */

http://git-wip-us.apache.org/repos/asf/beam/blob/fffd2c55/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
index 3d63b1a..f2be1f5 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
@@ -137,29 +137,29 @@ public class BufferedExternalSorterTest {
 
   @Test
   public void testEmpty() throws Exception {
-    SorterTestUtils.testEmpty(BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-        .setTempLocation(tmpLocation.toString())));
+    SorterTestUtils.testEmpty(BufferedExternalSorter.create(BufferedExternalSorter.options()
+        .withTempLocation(tmpLocation.toString())));
   }
 
   @Test
   public void testSingleElement() throws Exception {
     SorterTestUtils.testSingleElement(
-        BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-            .setTempLocation(tmpLocation.toString())));
+        BufferedExternalSorter.create(BufferedExternalSorter.options()
+            .withTempLocation(tmpLocation.toString())));
   }
 
   @Test
   public void testEmptyKeyValueElement() throws Exception {
     SorterTestUtils.testEmptyKeyValueElement(
-        BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-            .setTempLocation(tmpLocation.toString())));
+        BufferedExternalSorter.create(BufferedExternalSorter.options()
+            .withTempLocation(tmpLocation.toString())));
   }
 
   @Test
   public void testMultipleIterations() throws Exception {
     SorterTestUtils.testMultipleIterations(
-        BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-            .setTempLocation(tmpLocation.toString())));
+        BufferedExternalSorter.create(BufferedExternalSorter.options()
+            .withTempLocation(tmpLocation.toString())));
   }
 
   @Test
@@ -168,8 +168,8 @@ public class BufferedExternalSorterTest {
         new SorterGenerator() {
           @Override
           public Sorter generateSorter() throws Exception {
-            return BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-                .setTempLocation(tmpLocation.toString()));
+            return BufferedExternalSorter.create(BufferedExternalSorter.options()
+                .withTempLocation(tmpLocation.toString()));
           }
         },
         1000000,
@@ -182,8 +182,8 @@ public class BufferedExternalSorterTest {
         new SorterGenerator() {
           @Override
           public Sorter generateSorter() throws Exception {
-            return BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-                .setTempLocation(tmpLocation.toString()));
+            return BufferedExternalSorter.create(BufferedExternalSorter.options()
+                .withTempLocation(tmpLocation.toString()));
           }
         },
         1,
@@ -193,16 +193,16 @@ public class BufferedExternalSorterTest {
   @Test
   public void testAddAfterSort() throws Exception {
     SorterTestUtils.testAddAfterSort(
-        BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-              .setTempLocation(tmpLocation.toString())), thrown);
+        BufferedExternalSorter.create(BufferedExternalSorter.options()
+              .withTempLocation(tmpLocation.toString())), thrown);
     fail();
   }
 
   @Test
   public void testSortTwice() throws Exception {
     SorterTestUtils.testSortTwice(
-        BufferedExternalSorter.create(new BufferedExternalSorter.Options()
-            .setTempLocation(tmpLocation.toString())), thrown);
+        BufferedExternalSorter.create(BufferedExternalSorter.options()
+            .withTempLocation(tmpLocation.toString())), thrown);
     fail();
   }
 
@@ -210,24 +210,24 @@ public class BufferedExternalSorterTest {
   public void testNegativeMemory() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("memoryMB must be greater than zero");
-    BufferedExternalSorter.Options options = new BufferedExternalSorter.Options()
-        .setTempLocation(tmpLocation.toString());
-    options.setMemoryMB(-1);
+    BufferedExternalSorter.Options options = BufferedExternalSorter.options()
+        .withTempLocation(tmpLocation.toString());
+    options.withMemoryMB(-1);
   }
 
   @Test
   public void testZeroMemory() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("memoryMB must be greater than zero");
-    BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
-    options.setMemoryMB(0);
+    BufferedExternalSorter.Options options = BufferedExternalSorter.options();
+    options.withMemoryMB(0);
   }
 
   @Test
   public void testMemoryTooLarge() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("memoryMB must be less than 2048");
-    BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
-    options.setMemoryMB(2048);
+    BufferedExternalSorter.Options options = BufferedExternalSorter.options();
+    options.withMemoryMB(2048);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fffd2c55/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
index 4f77100..96d6b6a 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
@@ -66,7 +66,7 @@ public class SortValuesTest {
     // For every Key, sort the iterable of <SecondaryKey, Value> pairs by SecondaryKey.
     PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
         grouped.apply(
-            SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
+            SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));
 
     PAssert.that(groupedAndSorted)
         .satisfies(new AssertThatHasExpectedContentsForTestSecondaryKeySorting());


[9/9] beam git commit: This closes #1943

Posted by dh...@apache.org.
This closes #1943


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d66029ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d66029ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d66029ca

Branch: refs/heads/master
Commit: d66029cafde152c0a46ebd276ddfa4c3e7fd3433
Parents: 63b63f0 9957c89
Author: Dan Halperin <dh...@google.com>
Authored: Tue Feb 28 20:10:39 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:39 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    |   2 +-
 .../translation/ApexPipelineTranslator.java     |   2 +-
 .../FlattenPCollectionTranslator.java           |   6 +-
 .../operators/ApexFlattenOperator.java          |   4 +-
 .../EmptyFlattenAsCreateFactory.java            |   7 +-
 .../core/construction/PTransformMatchers.java   |   8 +-
 .../construction/PTransformMatchersTest.java    |  16 +-
 .../beam/runners/direct/EmptyInputProvider.java |   4 +-
 .../runners/direct/FlattenEvaluatorFactory.java |   4 +-
 .../runners/direct/RootProviderRegistry.java    |   4 +-
 .../direct/TransformEvaluatorRegistry.java      |   4 +-
 .../direct/WriteWithShardingFactory.java        |   9 +-
 .../runners/direct/DirectGraphVisitorTest.java  |   4 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../flink/examples/streaming/AutoComplete.java  |   2 +-
 .../flink/FlinkBatchTransformTranslators.java   |   6 +-
 .../FlinkStreamingTransformTranslators.java     |  12 +-
 .../beam/runners/flink/WriteSinkITCase.java     |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  13 +-
 .../spark/translation/TransformTranslator.java  |   8 +-
 .../streaming/StreamingTransformTranslator.java |   8 +-
 .../translation/streaming/CreateStreamTest.java |   4 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |   2 +-
 .../main/java/org/apache/beam/sdk/io/Write.java | 727 +++++++++----------
 .../org/apache/beam/sdk/transforms/Combine.java |   9 +-
 .../org/apache/beam/sdk/transforms/Count.java   |  24 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  15 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |   8 +-
 .../org/apache/beam/sdk/transforms/Latest.java  |  80 +-
 .../org/apache/beam/sdk/transforms/Sample.java  | 121 ++-
 .../apache/beam/sdk/transforms/ToString.java    | 109 ++-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  16 +-
 .../beam/sdk/runners/TransformTreeTest.java     |  23 +-
 .../apache/beam/sdk/testing/TestStreamTest.java |   4 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |  12 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |   2 +-
 .../apache/beam/sdk/transforms/SampleTest.java  |   2 +-
 .../beam/sdk/transforms/ToStringTest.java       |   8 +-
 .../sorter/BufferedExternalSorter.java          |  23 +-
 .../sorter/BufferedExternalSorterTest.java      |  46 +-
 .../sdk/extensions/sorter/SortValuesTest.java   |   2 +-
 43 files changed, 716 insertions(+), 658 deletions(-)
----------------------------------------------------------------------



[3/9] beam git commit: BEAM-1419 Flatten should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1419 Flatten should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5056efc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5056efc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5056efc

Branch: refs/heads/master
Commit: f5056efc80bf4f240ec1eeea4e2b50bf567a2d6c
Parents: b87621e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:55:19 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/ApexPipelineTranslator.java     |  2 +-
 .../translation/FlattenPCollectionTranslator.java    |  6 +++---
 .../translation/operators/ApexFlattenOperator.java   |  4 ++--
 .../construction/EmptyFlattenAsCreateFactory.java    |  7 +++----
 .../core/construction/PTransformMatchers.java        |  4 ++--
 .../core/construction/PTransformMatchersTest.java    |  6 +++---
 .../beam/runners/direct/EmptyInputProvider.java      |  4 ++--
 .../beam/runners/direct/FlattenEvaluatorFactory.java |  4 ++--
 .../beam/runners/direct/RootProviderRegistry.java    |  4 ++--
 .../runners/direct/TransformEvaluatorRegistry.java   |  4 ++--
 .../beam/runners/direct/DirectGraphVisitorTest.java  |  4 ++--
 .../flink/FlinkBatchTransformTranslators.java        |  6 +++---
 .../flink/FlinkStreamingTransformTranslators.java    |  6 +++---
 .../runners/dataflow/DataflowPipelineTranslator.java |  8 ++++----
 .../spark/translation/TransformTranslator.java       |  8 ++++----
 .../streaming/StreamingTransformTranslator.java      |  8 ++++----
 .../java/org/apache/beam/sdk/transforms/Flatten.java | 15 ++++++++-------
 .../org/apache/beam/sdk/transforms/FlattenTest.java  | 12 ++++++------
 18 files changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 36d679a..e9d6571 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -64,7 +64,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
-    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+    registerTransformTranslator(Flatten.PCollections.class,
         new FlattenPCollectionTranslator());
     registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
     registerTransformTranslator(CreateApexPCollectionView.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 2e31dfc..080c5e9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -35,14 +35,14 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
- * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
+ * {@link Flatten.PCollections} translation to Apex operator.
  */
 class FlattenPCollectionTranslator<T> implements
-    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+    TransformTranslator<Flatten.PCollections<T>> {
   private static final long serialVersionUID = 1L;
 
   @Override
-  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+  public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
     List<PCollection<T>> inputCollections = extractPCollections(context.getInputs());
 
     if (inputCollections.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
index 3d9db51..4594765 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
@@ -21,15 +21,15 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
-
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
+ * Apex operator for Beam {@link PCollections}.
  */
 public class ApexFlattenOperator<InputT> extends BaseOperator {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
index 3b29c0a..0168039 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -36,11 +35,11 @@ import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
- * Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
+ * Flatten.PCollections} that takes no input {@link PCollection PCollections}.
  */
 public class EmptyFlattenAsCreateFactory<T>
     implements PTransformOverrideFactory<
-        PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>> {
+        PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>> {
   private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
       new EmptyFlattenAsCreateFactory<>();
 
@@ -52,7 +51,7 @@ public class EmptyFlattenAsCreateFactory<T>
 
   @Override
   public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
-      FlattenPCollectionList<T> transform) {
+      Flatten.PCollections<T> transform) {
     return (PTransform) Create.empty(VoidCoder.of());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 7f8d467..efcc455 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -165,14 +165,14 @@ public class PTransformMatchers {
   }
 
   /**
-   * A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which
+   * A {@link PTransformMatcher} which matches a {@link Flatten.PCollections} which
    * consumes no input {@link PCollection PCollections}.
    */
   public static PTransformMatcher emptyFlatten() {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        return (application.getTransform() instanceof Flatten.FlattenPCollectionList)
+        return (application.getTransform() instanceof Flatten.PCollections)
             && application.getInputs().isEmpty();
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index be3ed6b..491c14f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -327,7 +327,7 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
                 of(
                     "EmptyFlatten",
                     Collections.<TaggedPValue>emptyList(),
@@ -346,7 +346,7 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
                 of(
                     "Flatten",
                     Collections.singletonList(
@@ -369,7 +369,7 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.FlattenIterables<Object>>
+            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
                 of(
                     "EmptyFlatten",
                     Collections.<TaggedPValue>emptyList(),

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 1185130..98d4a64 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 
 /** A {@link RootInputProvider} that provides a singleton empty bundle. */
 class EmptyInputProvider<T>
-    implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.FlattenPCollectionList<T>> {
+    implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.PCollections<T>> {
   EmptyInputProvider() {}
 
   /**
@@ -37,7 +37,7 @@ class EmptyInputProvider<T>
    */
   @Override
   public Collection<CommittedBundle<Void>> getInitialInputs(
-      AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>>
+      AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>>
           transform,
       int targetParallelism) {
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 66862ea..8528905 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -22,7 +22,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -53,7 +53,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
 
   private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
       final AppliedPTransform<
-              PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
+              PCollectionList<InputT>, PCollection<InputT>, PCollections<InputT>>
           application) {
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index e8a7665..eb9492c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -42,7 +42,7 @@ class RootProviderRegistry {
         .put(
             TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
             new TestStreamEvaluatorFactory.InputProvider(context))
-        .put(FlattenPCollectionList.class, new EmptyInputProvider());
+        .put(PCollections.class, new EmptyInputProvider());
     return new RootProviderRegistry(defaultProviders.build());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 1ddf9f4..9fdefc3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
             .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
             .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
-            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
+            .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
             .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
             // Runner-specific primitives used in expansion of GroupByKey

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index df49796..8b4573f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -110,7 +110,7 @@ public class DirectGraphVisitorTest implements Serializable {
 
   @Test
   public void getRootTransformsContainsEmptyFlatten() {
-    FlattenPCollectionList<String> flatten = Flatten.pCollections();
+    PCollections<String> flatten = Flatten.pCollections();
     PCollectionList<String> emptyList = PCollectionList.empty(p);
     PCollection<String> empty = emptyList.apply(flatten);
     empty.setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 30e9d68..acc204d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -108,7 +108,7 @@ class FlinkBatchTransformTranslators {
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
     TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
 
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
 
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
 
@@ -706,12 +706,12 @@ class FlinkBatchTransformTranslators {
 
   private static class FlattenPCollectionTranslatorBatch<T>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          Flatten.FlattenPCollectionList<T>> {
+      Flatten.PCollections<T>> {
 
     @Override
     @SuppressWarnings("unchecked")
     public void translateNode(
-        Flatten.FlattenPCollectionList<T> transform,
+        Flatten.PCollections<T> transform,
         FlinkBatchTranslationContext context) {
 
       List<TaggedPValue> allInputs = context.getInputs(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index a3cceb2..03f567d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -125,7 +125,7 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
 
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
     TRANSLATORS.put(
         FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
         new CreateViewStreamingTranslator());
@@ -999,11 +999,11 @@ class FlinkStreamingTransformTranslators {
 
   private static class FlattenPCollectionTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-        Flatten.FlattenPCollectionList<T>> {
+      Flatten.PCollections<T>> {
 
     @Override
     public void translateNode(
-        Flatten.FlattenPCollectionList<T> transform,
+        Flatten.PCollections<T> transform,
         FlinkStreamingTranslationContext context) {
       List<TaggedPValue> allInputs = context.getInputs(transform);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index c672e99..fe5db5a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -744,16 +744,16 @@ public class DataflowPipelineTranslator {
         });
 
     registerTransformTranslator(
-        Flatten.FlattenPCollectionList.class,
-        new TransformTranslator<Flatten.FlattenPCollectionList>() {
+        Flatten.PCollections.class,
+        new TransformTranslator<Flatten.PCollections>() {
           @Override
           public void translate(
-              Flatten.FlattenPCollectionList transform, TranslationContext context) {
+              Flatten.PCollections transform, TranslationContext context) {
             flattenHelper(transform, context);
           }
 
           private <T> void flattenHelper(
-              Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+              Flatten.PCollections<T> transform, TranslationContext context) {
             StepTranslationContext stepContext = context.addStep(transform, "Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a643651..7fc09ad 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -97,11 +97,11 @@ public final class TransformTranslator {
   private TransformTranslator() {
   }
 
-  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
-    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+  private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
+    return new TransformEvaluator<Flatten.PCollections<T>>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+      public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
         List<TaggedPValue> pcs = context.getInputs(transform);
         JavaRDD<WindowedValue<T>> unionRDD;
         if (pcs.size() == 0) {
@@ -729,7 +729,7 @@ public final class TransformTranslator {
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());
-    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+    EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
     EVALUATORS.put(Create.Values.class, create());
     EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
     EVALUATORS.put(View.AsIterable.class, viewAsIter());

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 4a07741..a856897 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -170,11 +170,11 @@ final class StreamingTransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
-    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+  private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
+    return new TransformEvaluator<Flatten.PCollections<T>>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+      public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
         List<TaggedPValue> pcs = context.getInputs(transform);
         // since this is a streaming pipeline, at least one of the PCollections to "flatten" are
         // unbounded, meaning it represents a DStream.
@@ -445,7 +445,7 @@ final class StreamingTransformTranslator {
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.class, createFromQueue());
     EVALUATORS.put(Window.Bound.class, window());
-    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+    EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 3ef2e55..7b282b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -64,8 +64,8 @@ public class Flatten {
    * @param <T> the type of the elements in the input and output
    * {@code PCollection}s.
    */
-  public static <T> FlattenPCollectionList<T> pCollections() {
-    return new FlattenPCollectionList<>();
+  public static <T> PCollections<T> pCollections() {
+    return new PCollections<>();
   }
 
   /**
@@ -86,8 +86,8 @@ public class Flatten {
    * @param <T> the type of the elements of the input {@code Iterable} and
    * the output {@code PCollection}
    */
-  public static <T> FlattenIterables<T> iterables() {
-    return new FlattenIterables<>();
+  public static <T> Iterables<T> iterables() {
+    return new Iterables<>();
   }
 
   /**
@@ -99,10 +99,10 @@ public class Flatten {
    * @param <T> the type of the elements in the input and output
    * {@code PCollection}s.
    */
-  public static class FlattenPCollectionList<T>
+  public static class PCollections<T>
       extends PTransform<PCollectionList<T>, PCollection<T>> {
 
-    private FlattenPCollectionList() { }
+    private PCollections() { }
 
     @Override
     public PCollection<T> expand(PCollectionList<T> inputs) {
@@ -159,8 +159,9 @@ public class Flatten {
    * @param <T> the type of the elements of the input {@code Iterable}s and
    * the output {@code PCollection}
    */
-  public static class FlattenIterables<T>
+  public static class Iterables<T>
       extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
+    private Iterables() {}
 
     @Override
     public PCollection<T> expand(PCollection<? extends Iterable<T>> in) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 3b5011b..bc3e322 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -81,7 +81,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionList() {
+  public void testFlattenPCollections() {
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
 
@@ -95,7 +95,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionListThenParDo() {
+  public void testFlattenPCollectionsThenParDo() {
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
 
@@ -110,7 +110,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionListEmpty() {
+  public void testFlattenPCollectionsEmpty() {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of());
@@ -198,7 +198,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionListEmptyThenParDo() {
+  public void testFlattenPCollectionsEmptyThenParDo() {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
@@ -366,8 +366,8 @@ public class FlattenTest implements Serializable {
 
   @Test
   public void testFlattenGetName() {
-    Assert.assertEquals("Flatten.FlattenIterables", Flatten.<String>iterables().getName());
-    Assert.assertEquals("Flatten.FlattenPCollectionList", Flatten.<String>pCollections().getName());
+    Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
+    Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
   }
 
   /////////////////////////////////////////////////////////////////////////////


[2/9] beam git commit: BEAM-1420 GroupByKey should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1420 GroupByKey should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7c60cc0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7c60cc0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7c60cc0

Branch: refs/heads/master
Commit: a7c60cc0b9d59d19398f788b84a17d4686ea3f82
Parents: 63b63f0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:17:21 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:36 2017 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/transforms/Combine.java   | 9 +++++----
 .../java/org/apache/beam/sdk/transforms/GroupByKey.java     | 8 ++++----
 .../java/org/apache/beam/sdk/transforms/GroupByKeyTest.java | 2 +-
 3 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a7c60cc0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 51c5e71..b4626e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -1877,9 +1876,11 @@ public class Combine {
     @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
       return input
-          .apply(GroupByKey.<K, InputT>create(fewKeys))
-          .apply(Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
-              .withSideInputs(sideInputs));
+          .apply(
+              fewKeys ? GroupByKey.<K, InputT>createWithFewKeys() : GroupByKey.<K, InputT>create())
+          .apply(
+              Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
+                  .withSideInputs(sideInputs));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a7c60cc0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 1541059..adf189b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -142,17 +142,17 @@ public class GroupByKey<K, V>
   }
 
   /**
-   * Returns a {@code GroupByKey<K, V>} {@code PTransform}.
+   * Returns a {@code GroupByKey<K, V>} {@code PTransform} that assumes it will be grouping
+   * a small number of keys.
    *
    * @param <K> the type of the keys of the input and output
    * {@code PCollection}s
    * @param <V> the type of the values of the input {@code PCollection}
    * and the elements of the {@code Iterable}s in the output
    * {@code PCollection}
-   * @param fewKeys whether it groups just few keys.
    */
-  static <K, V> GroupByKey<K, V> create(boolean fewKeys) {
-    return new GroupByKey<>(fewKeys);
+  static <K, V> GroupByKey<K, V> createWithFewKeys() {
+    return new GroupByKey<>(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a7c60cc0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index f4bec3a..73cedfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -374,7 +374,7 @@ public class GroupByKeyTest {
   @Test
   public void testDisplayData() {
     GroupByKey<String, String> groupByKey = GroupByKey.create();
-    GroupByKey<String, String> groupByFewKeys = GroupByKey.create(true);
+    GroupByKey<String, String> groupByFewKeys = GroupByKey.createWithFewKeys();
 
     DisplayData gbkDisplayData = DisplayData.from(groupByKey);
     DisplayData fewKeysDisplayData = DisplayData.from(groupByFewKeys);


[4/9] beam git commit: BEAM-1423 Sample should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1423 Sample should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8e2387d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8e2387d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8e2387d

Branch: refs/heads/master
Commit: a8e2387d37ac2df5c4627ba8f1a9b4ce1ef417b5
Parents: 0b9afe8
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:24:08 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Sample.java  | 121 +++++++++++++------
 .../beam/sdk/runners/TransformTreeTest.java     |  19 +--
 .../apache/beam/sdk/transforms/SampleTest.java  |   2 +-
 3 files changed, 95 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 7d4e630..3734f7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,9 +38,17 @@ import org.apache.beam.sdk.values.PCollectionView;
  * {@code PTransform}s for taking samples of the elements in a
  * {@code PCollection}, or samples of the values associated with each
  * key in a {@code PCollection} of {@code KV}s.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
  */
 public class Sample {
 
+  /** Returns a {@link CombineFn} that computes a fixed-sized sample of its inputs. */
+  public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) {
+    return new FixedSizedSampleFn<>(sampleSize);
+  }
+
   /**
    * {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and
    * produces a new {@code PCollection<T>} containing up to limit
@@ -64,72 +72,65 @@ public class Sample {
    * @param limit the number of elements to take from the input
    */
   public static <T> PTransform<PCollection<T>, PCollection<T>> any(long limit) {
-    return new SampleAny<>(limit);
+    return new Any<>(limit);
   }
 
   /**
-   * Returns a {@code PTransform} that takes a {@code PCollection<T>},
-   * selects {@code sampleSize} elements, uniformly at random, and returns a
-   * {@code PCollection<Iterable<T>>} containing the selected elements.
-   * If the input {@code PCollection} has fewer than
-   * {@code sampleSize} elements, then the output {@code Iterable<T>}
-   * will be all the input's elements.
+   * Returns a {@code PTransform} that takes a {@code PCollection<T>}, selects {@code sampleSize}
+   * elements, uniformly at random, and returns a {@code PCollection<Iterable<T>>} containing the
+   * selected elements. If the input {@code PCollection} has fewer than {@code sampleSize} elements,
+   * then the output {@code Iterable<T>} will be all the input's elements.
    *
    * <p>Example of use:
-   * <pre> {@code
+   *
+   * <pre>{@code
    * PCollection<String> pc = ...;
    * PCollection<Iterable<String>> sampleOfSize10 =
    *     pc.apply(Sample.fixedSizeGlobally(10));
-   * } </pre>
+   * }
+   * </pre>
    *
    * @param sampleSize the number of elements to select; must be {@code >= 0}
    * @param <T> the type of the elements
    */
-  public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>>
-      fixedSizeGlobally(int sampleSize) {
-    return Combine.globally(new FixedSizedSampleFn<T>(sampleSize));
+  public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>> fixedSizeGlobally(
+      int sampleSize) {
+    return new FixedSizeGlobally<>(sampleSize);
   }
 
   /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<KV<K, V>>} and returns a
-   * {@code PCollection<KV<K, Iterable<V>>>} that contains an output
-   * element mapping each distinct key in the input
-   * {@code PCollection} to a sample of {@code sampleSize} values
-   * associated with that key in the input {@code PCollection}, taken
-   * uniformly at random.  If a key in the input {@code PCollection}
-   * has fewer than {@code sampleSize} values associated with it, then
-   * the output {@code Iterable<V>} associated with that key will be
-   * all the values associated with that key in the input
-   * {@code PCollection}.
+   * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, V>>} and returns a
+   * {@code PCollection<KV<K, Iterable<V>>>} that contains an output element mapping each distinct
+   * key in the input {@code PCollection} to a sample of {@code sampleSize} values associated with
+   * that key in the input {@code PCollection}, taken uniformly at random. If a key in the input
+   * {@code PCollection} has fewer than {@code sampleSize} values associated with it, then the
+   * output {@code Iterable<V>} associated with that key will be all the values associated with that
+   * key in the input {@code PCollection}.
    *
    * <p>Example of use:
-   * <pre> {@code
+   *
+   * <pre>{@code
    * PCollection<KV<String, Integer>> pc = ...;
    * PCollection<KV<String, Iterable<Integer>>> sampleOfSize10PerKey =
    *     pc.apply(Sample.<String, Integer>fixedSizePerKey());
-   * } </pre>
+   * }
+   * </pre>
    *
-   * @param sampleSize the number of values to select for each
-   * distinct key; must be {@code >= 0}
+   * @param sampleSize the number of values to select for each distinct key; must be {@code >= 0}
    * @param <K> the type of the keys
    * @param <V> the type of the values
    */
-  public static <K, V> PTransform<PCollection<KV<K, V>>,
-                                  PCollection<KV<K, Iterable<V>>>>
-      fixedSizePerKey(int sampleSize) {
-    return Combine.perKey(new FixedSizedSampleFn<V>(sampleSize));
+  public static <K, V>
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> fixedSizePerKey(
+          int sampleSize) {
+    return new FixedSizePerKey<>(sampleSize);
   }
 
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@link PTransform} that takes a {@code PCollection<T>} and a limit, and
-   * produces a new {@code PCollection<T>} containing up to limit
-   * elements of the input {@code PCollection}.
-   */
-  public static class SampleAny<T> extends PTransform<PCollection<T>, PCollection<T>> {
+  /** Implementation of {@link #any(long)}. */
+  private static class Any<T> extends PTransform<PCollection<T>, PCollection<T>> {
     private final long limit;
 
     /**
@@ -137,7 +138,7 @@ public class Sample {
      * produces a new PCollection containing up to {@code limit}
      * elements of its input {@code PCollection}.
      */
-    private SampleAny(long limit) {
+    private Any(long limit) {
       checkArgument(limit >= 0, "Expected non-negative limit, received %s.", limit);
       this.limit = limit;
     }
@@ -162,6 +163,50 @@ public class Sample {
     }
   }
 
+  /** Implementation of {@link #fixedSizeGlobally(int)}. */
+  private static class FixedSizeGlobally<T>
+      extends PTransform<PCollection<T>, PCollection<Iterable<T>>> {
+    private final int sampleSize;
+
+    private FixedSizeGlobally(int sampleSize) {
+      this.sampleSize = sampleSize;
+    }
+
+    @Override
+    public PCollection<Iterable<T>> expand(PCollection<T> input) {
+      return input.apply(Combine.globally(new FixedSizedSampleFn<T>(sampleSize)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("sampleSize", sampleSize)
+          .withLabel("Sample Size"));
+    }
+  }
+
+  /** Implementation of {@link #fixedSizeGlobally(int)}. */
+  private static class FixedSizePerKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+    private final int sampleSize;
+
+    private FixedSizePerKey(int sampleSize) {
+      this.sampleSize = sampleSize;
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Combine.<K, V, Iterable<V>>perKey(new FixedSizedSampleFn<V>(sampleSize)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("sampleSize", sampleSize)
+          .withLabel("Sample Size"));
+    }
+  }
+
   /**
    * A {@link DoFn} that returns up to limit elements from the side input PCollection.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 6a6e0fc..1f9beb3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -34,8 +34,10 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sample;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -62,7 +64,7 @@ public class TransformTreeTest {
   enum TransformsSeen {
     READ,
     WRITE,
-    SAMPLE_ANY
+    COMBINE_GLOBALLY
   }
 
   /**
@@ -120,7 +122,8 @@ public class TransformTreeTest {
     File outputFile = tmpFolder.newFile();
 
     p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
-        .apply(Sample.<String>any(10))
+        .apply(Combine.globally(Sample.<String>combineFn(10)))
+        .apply(Flatten.<String>iterables())
         .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
 
     final EnumSet<TransformsSeen> visited =
@@ -132,8 +135,8 @@ public class TransformTreeTest {
       @Override
       public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Sample.SampleAny) {
-          assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
+        if (transform instanceof Combine.Globally) {
+          assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
         } else if (transform instanceof Write.Bound) {
@@ -148,8 +151,8 @@ public class TransformTreeTest {
       @Override
       public void leaveCompositeTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Sample.SampleAny) {
-          assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
+        if (transform instanceof Combine.Globally) {
+          assertTrue(left.add(TransformsSeen.COMBINE_GLOBALLY));
         }
       }
 
@@ -157,7 +160,7 @@ public class TransformTreeTest {
       public void visitPrimitiveTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
-        assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+        assertThat(transform, not(instanceOf(Combine.Globally.class)));
         assertThat(transform, not(instanceOf(Write.Bound.class)));
         if (transform instanceof Read.Bounded
             && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
@@ -167,7 +170,7 @@ public class TransformTreeTest {
     });
 
     assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
-    assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
+    assertTrue(left.equals(EnumSet.of(TransformsSeen.COMBINE_GLOBALLY)));
   }
 
   @Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 88b0145..8e426c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -318,7 +318,7 @@ public class SampleTest {
 
     @Test
     public void testSampleGetName() {
-      assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
+      assertEquals("Sample.Any", Sample.<String>any(1).getName());
     }
 
     @Test


[8/9] beam git commit: BEAM-1416 Write transform should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1416 Write transform should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b87621e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b87621e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b87621e9

Branch: refs/heads/master
Commit: b87621e9533e772f92a3cf4325299350eb615b62
Parents: fffd2c5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:44:40 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   |   4 +-
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../direct/WriteWithShardingFactory.java        |   9 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../FlinkStreamingTransformTranslators.java     |   6 +-
 .../beam/runners/flink/WriteSinkITCase.java     |   2 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  13 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |   2 +-
 .../main/java/org/apache/beam/sdk/io/Write.java | 727 +++++++++----------
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  16 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 12 files changed, 394 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 05b632b..7f8d467 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -182,8 +182,8 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (application.getTransform() instanceof Write.Bound) {
-          return ((Write.Bound) application.getTransform()).getSharding() == null;
+        if (application.getTransform() instanceof Write) {
+          return ((Write) application.getTransform()).getSharding() == null;
         }
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cace033..be3ed6b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -387,7 +387,7 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void writeWithRunnerDeterminedSharding() {
-    Write.Bound<Integer> write =
+    Write<Integer> write =
         Write.to(
             new FileBasedSink<Integer>("foo", "bar") {
               @Override
@@ -400,13 +400,13 @@ public class PTransformMatchersTest implements Serializable {
         PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));
 
-    Write.Bound<Integer> withStaticSharding = write.withNumShards(3);
+    Write<Integer> withStaticSharding = write.withNumShards(3);
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
             .matches(appliedWrite(withStaticSharding)),
         is(false));
 
-    Write.Bound<Integer> withCustomSharding =
+    Write<Integer> withCustomSharding =
         write.withSharding(Sum.integersGlobally().asSingletonView());
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -414,8 +414,8 @@ public class PTransformMatchersTest implements Serializable {
         is(false));
   }
 
-  private AppliedPTransform<?, ?, ?> appliedWrite(Write.Bound<Integer> write) {
-    return AppliedPTransform.<PCollection<Integer>, PDone, Write.Bound<Integer>>of(
+  private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
+    return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
         "Write",
         Collections.<TaggedPValue>emptyList(),
         Collections.<TaggedPValue>emptyList(),

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index f206fb0..63122fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.io.Write.Bound;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -48,13 +47,15 @@ import org.apache.beam.sdk.values.TaggedPValue;
  * of shards is the log base 10 of the number of input records, with up to 2 additional shards.
  */
 class WriteWithShardingFactory<InputT>
-    implements PTransformOverrideFactory<PCollection<InputT>, PDone, Bound<InputT>> {
+    implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
   @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
 
   @Override
-  public PTransform<PCollection<InputT>, PDone> getReplacementTransform(Bound<InputT> transform) {
-    return transform.withSharding(new LogElementShardsWithDrift<InputT>());
+  public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
+      Write<InputT> transform) {
+
+      return transform.withSharding(new LogElementShardsWithDrift<InputT>());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 51f3a87..16b6312 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -117,7 +117,7 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
-    Write.Bound<Object> original = Write.to(new TestSink());
+    Write<Object> original = Write.to(new TestSink());
     assertThat(factory.getReplacementTransform(original), not(equalTo((Object) original)));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index eaab3d1..a3cceb2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -118,7 +118,7 @@ class FlinkStreamingTransformTranslators {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+    TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator());
     TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
@@ -194,10 +194,10 @@ class FlinkStreamingTransformTranslators {
   }
 
   private static class WriteSinkStreamingTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> {
 
     @Override
-    public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(Write<T> transform, FlinkStreamingTranslationContext context) {
       String name = transform.getName();
       PValue input = context.getInput(transform);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 6986663..572c291 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 /**
- * Tests the translation of custom Write.Bound sinks.
+ * Tests the translation of custom Write sinks.
  */
 public class WriteSinkITCase extends JavaProgramTestBase {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0fe3a89..dbf1958 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.io.Write.Bound;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -367,7 +366,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               "The DataflowRunner in batch mode does not support Read.Unbounded"));
       ptoverrides
           // Write uses views internally
-          .put(PTransformMatchers.classEqualTo(Write.Bound.class), new BatchWriteFactory(this))
+          .put(PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this))
           .put(
               PTransformMatchers.classEqualTo(View.AsMap.class),
               new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this))
@@ -772,14 +771,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   private class BatchWriteFactory<T>
-      implements PTransformOverrideFactory<PCollection<T>, PDone, Write.Bound<T>> {
+      implements PTransformOverrideFactory<PCollection<T>, PDone, Write<T>> {
     private final DataflowRunner runner;
     private BatchWriteFactory(DataflowRunner dataflowRunner) {
       this.runner = dataflowRunner;
     }
 
     @Override
-    public PTransform<PCollection<T>, PDone> getReplacementTransform(Bound<T> transform) {
+    public PTransform<PCollection<T>, PDone> getReplacementTransform(Write<T> transform) {
       return new BatchWrite<>(runner, transform);
     }
 
@@ -797,17 +796,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   /**
    * Specialized implementation which overrides
-   * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google
+   * {@link org.apache.beam.sdk.io.Write Write} to provide Google
    * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
    */
   private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> {
     private final DataflowRunner runner;
-    private final Write.Bound<T> transform;
+    private final Write<T> transform;
     /**
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchWrite(DataflowRunner runner, Write.Bound<T> transform) {
+    public BatchWrite(DataflowRunner runner, Write<T> transform) {
       this.runner = runner;
       this.transform = transform;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 388d9f0..67a4381 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -804,7 +804,7 @@ public class AvroIO {
           throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
         }
 
-        org.apache.beam.sdk.io.Write.Bound<T> write =
+        org.apache.beam.sdk.io.Write<T> write =
             org.apache.beam.sdk.io.Write.to(
                 new AvroSink<>(
                     filenamePrefix,

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6e23a28..fe8d0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -662,7 +662,7 @@ public class TextIO {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");
         }
-        org.apache.beam.sdk.io.Write.Bound<String> write =
+        org.apache.beam.sdk.io.Write<String> write =
             org.apache.beam.sdk.io.Write.to(
                 new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
                     writableByteChannelFactory));

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index acbbb97..948a65b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -62,7 +62,7 @@ import org.slf4j.LoggerFactory;
  * <p>By default, every bundle in the input {@link PCollection} will be processed by a
  * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
  * least 1 output will always be produced. The exact parallelism of the write stage can be
- * controlled using {@link Write.Bound#withNumShards}, typically used to control how many files are
+ * controlled using {@link Write#withNumShards}, typically used to control how many files are
  * produced or to globally limit the number of workers connecting to an external service. However,
  * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
  *
@@ -78,421 +78,412 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write {
+public class Write<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(Write.class);
 
+  private final Sink<T> sink;
+  @Nullable
+  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+
   /**
    * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
    * control how many different shards are produced.
    */
-  public static <T> Bound<T> to(Sink<T> sink) {
+  public static <T> Write<T> to(Sink<T> sink) {
     checkNotNull(sink, "sink");
-    return new Bound<>(sink, null /* runner-determined sharding */);
+    return new Write<>(sink, null /* runner-determined sharding */);
   }
 
-  /**
-   * A {@link PTransform} that writes to a {@link Sink}. See the class-level Javadoc for more
-   * information.
-   *
-   * @see Write
-   * @see Sink
-   */
-  public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-    private final Sink<T> sink;
-    @Nullable
-    private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
-
-    private Bound(
-        Sink<T> sink,
-        @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
-      this.sink = sink;
-      this.computeNumShards = computeNumShards;
-    }
+  private Write(
+      Sink<T> sink,
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
+    this.sink = sink;
+    this.computeNumShards = computeNumShards;
+  }
 
-    @Override
-    public PDone expand(PCollection<T> input) {
-      checkArgument(
-          IsBounded.BOUNDED == input.isBounded(),
-          "%s can only be applied to a Bounded PCollection",
-          Write.class.getSimpleName());
-      PipelineOptions options = input.getPipeline().getOptions();
-      sink.validate(options);
-      return createWrite(input, sink.createWriteOperation(options));
-    }
+  @Override
+  public PDone expand(PCollection<T> input) {
+    checkArgument(
+        IsBounded.BOUNDED == input.isBounded(),
+        "%s can only be applied to a Bounded PCollection",
+        Write.class.getSimpleName());
+    PipelineOptions options = input.getPipeline().getOptions();
+    sink.validate(options);
+    return createWrite(input, sink.createWriteOperation(options));
+  }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-          .include("sink", sink);
-      if (getSharding() != null) {
-        builder.include("sharding", getSharding());
-      }
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+        .include("sink", sink);
+    if (getSharding() != null) {
+      builder.include("sharding", getSharding());
     }
+  }
 
-    /**
-     * Returns the {@link Sink} associated with this PTransform.
-     */
-    public Sink<T> getSink() {
-      return sink;
-    }
+  /**
+   * Returns the {@link Sink} associated with this PTransform.
+   */
+  public Sink<T> getSink() {
+    return sink;
+  }
 
-    /**
-     * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
-     * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
-     * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-     * #withRunnerDeterminedSharding()}.
-     */
-    @Nullable
-    public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
-      return computeNumShards;
-    }
+  /**
+   * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
+   * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
+   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
+   * #withRunnerDeterminedSharding()}.
+   */
+  @Nullable
+  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+    return computeNumShards;
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
-     * specified number of shards.
-     *
-     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-     * more information.
-     *
-     * <p>A value less than or equal to 0 will be equivalent to the default behavior of
-     * runner-determined sharding.
-     */
-    public Bound<T> withNumShards(int numShards) {
-      if (numShards > 0) {
-        return withNumShards(StaticValueProvider.of(numShards));
-      }
-      return withRunnerDeterminedSharding();
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   *
+   * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+   * runner-determined sharding.
+   */
+  public Write<T> withNumShards(int numShards) {
+    if (numShards > 0) {
+      return withNumShards(StaticValueProvider.of(numShards));
     }
+    return withRunnerDeterminedSharding();
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
-     * {@link ValueProvider} specified number of shards.
-     *
-     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-     * more information.
-     */
-    public Bound<T> withNumShards(ValueProvider<Integer> numShards) {
-      return new Bound<>(sink, new ConstantShards<T>(numShards));
-    }
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * {@link ValueProvider} specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   */
+  public Write<T> withNumShards(ValueProvider<Integer> numShards) {
+    return new Write<>(sink, new ConstantShards<T>(numShards));
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
-     * specified {@link PTransform} to compute the number of shards.
-     *
-     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-     * more information.
-     */
-    public Bound<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
-      checkNotNull(
-          sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-      return new Bound<>(sink, sharding);
-    }
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * specified {@link PTransform} to compute the number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   */
+  public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+    checkNotNull(
+        sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
+    return new Write<>(sink, sharding);
+  }
+
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} with
+   * runner-determined sharding.
+   */
+  public Write<T> withRunnerDeterminedSharding() {
+    return new Write<>(sink, null);
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} with
-     * runner-determined sharding.
-     */
-    public Bound<T> withRunnerDeterminedSharding() {
-      return new Bound<>(sink, null);
+  /**
+   * Writes all the elements in a bundle using a {@link Writer} produced by the
+   * {@link WriteOperation} associated with the {@link Sink}.
+   */
+  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+    // Writer that will write the records in this bundle. Lazily
+    // initialized in processElement.
+    private Writer<T, WriteT> writer = null;
+    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+      this.writeOperationView = writeOperationView;
     }
 
-    /**
-     * Writes all the elements in a bundle using a {@link Writer} produced by the
-     * {@link WriteOperation} associated with the {@link Sink}.
-     */
-    private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
-      // Writer that will write the records in this bundle. Lazily
-      // initialized in processElement.
-      private Writer<T, WriteT> writer = null;
-      private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
-      WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
-        this.writeOperationView = writeOperationView;
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      // Lazily initialize the Writer
+      if (writer == null) {
+        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+        LOG.info("Opening writer for write operation {}", writeOperation);
+        writer = writeOperation.createWriter(c.getPipelineOptions());
+        writer.open(UUID.randomUUID().toString());
+        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
       }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        // Lazily initialize the Writer
-        if (writer == null) {
-          WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-          LOG.info("Opening writer for write operation {}", writeOperation);
-          writer = writeOperation.createWriter(c.getPipelineOptions());
-          writer.open(UUID.randomUUID().toString());
-          LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-        }
+      try {
+        writer.write(c.element());
+      } catch (Exception e) {
+        // Discard write result and close the write.
         try {
-          writer.write(c.element());
-        } catch (Exception e) {
-          // Discard write result and close the write.
-          try {
-            writer.close();
-            // The writer does not need to be reset, as this DoFn cannot be reused.
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
+          writer.close();
+          // The writer does not need to be reset, as this DoFn cannot be reused.
+        } catch (Exception closeException) {
+          if (closeException instanceof InterruptedException) {
+            // Do not silently ignore interrupted state.
+            Thread.currentThread().interrupt();
           }
-          throw e;
+          // Do not mask the exception that caused the write to fail.
+          e.addSuppressed(closeException);
         }
+        throw e;
       }
+    }
 
-      @FinishBundle
-      public void finishBundle(Context c) throws Exception {
-        if (writer != null) {
-          WriteT result = writer.close();
-          c.output(result);
-          // Reset state in case of reuse.
-          writer = null;
-        }
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (writer != null) {
+        WriteT result = writer.close();
+        c.output(result);
+        // Reset state in case of reuse.
+        writer = null;
       }
+    }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.delegate(Write.Bound.this);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(Write.this);
     }
+  }
 
-    /**
-     * Like {@link WriteBundles}, but where the elements for each shard have been collected into
-     * a single iterable.
-     *
-     * @see WriteBundles
-     */
-    private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
-      private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
-      WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
-        this.writeOperationView = writeOperationView;
-      }
+  /**
+   * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+   * a single iterable.
+   *
+   * @see WriteBundles
+   */
+  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
 
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        // In a sharded write, single input element represents one shard. We can open and close
-        // the writer in each call to processElement.
-        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-        writer.open(UUID.randomUUID().toString());
-        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+      this.writeOperationView = writeOperationView;
+    }
 
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      // In a sharded write, single input element represents one shard. We can open and close
+      // the writer in each call to processElement.
+      WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+      LOG.info("Opening writer for write operation {}", writeOperation);
+      Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+      writer.open(UUID.randomUUID().toString());
+      LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+      try {
+        for (T t : c.element().getValue()) {
+          writer.write(t);
+        }
+      } catch (Exception e) {
         try {
-          for (T t : c.element().getValue()) {
-            writer.write(t);
+          writer.close();
+        } catch (Exception closeException) {
+          if (closeException instanceof InterruptedException) {
+            // Do not silently ignore interrupted state.
+            Thread.currentThread().interrupt();
           }
-        } catch (Exception e) {
-          try {
-            writer.close();
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
+          // Do not mask the exception that caused the write to fail.
+          e.addSuppressed(closeException);
         }
-
-        // Close the writer; if this throws let the error propagate.
-        WriteT result = writer.close();
-        c.output(result);
+        throw e;
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.delegate(Write.Bound.this);
-      }
+      // Close the writer; if this throws let the error propagate.
+      WriteT result = writer.close();
+      c.output(result);
     }
 
-    private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-      private final PCollectionView<Integer> numShards;
-      private int shardNumber;
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(Write.this);
+    }
+  }
 
-      ApplyShardingKey(PCollectionView<Integer> numShards) {
-        this.numShards = numShards;
-        shardNumber = -1;
-      }
+  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+    private final PCollectionView<Integer> numShards;
+    private int shardNumber;
 
-      @ProcessElement
-      public void processElement(ProcessContext context) {
-        Integer shardCount = context.sideInput(numShards);
-        checkArgument(
-            shardCount > 0,
-            "Must have a positive number of shards specified for non-runner-determined sharding."
-                + " Got %s",
-            shardCount);
-        if (shardNumber == -1) {
-          // We want to desynchronize the first record sharding key for each instance of
-          // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
-          shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
-        } else {
-          shardNumber = (shardNumber + 1) % shardCount;
-        }
-        context.output(KV.of(shardNumber, context.element()));
-      }
+    ApplyShardingKey(PCollectionView<Integer> numShards) {
+      this.numShards = numShards;
+      shardNumber = -1;
     }
 
-    /**
-     * A write is performed as sequence of three {@link ParDo}'s.
-     *
-     * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
-     * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
-     * called. The output of this ParDo is a singleton PCollection
-     * containing the WriteOperation.
-     *
-     * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-     * ParDo over the PCollection of elements to write. In this bundle-writing phase,
-     * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
-     * every element in the bundle. The output of this ParDo is a PCollection of
-     * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
-     * each bundle.
-     *
-     * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
-     * the collection of writer results as a side-input. In this ParDo,
-     * {@link WriteOperation#finalize} is called to finalize the write.
-     *
-     * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-     * before the exception that caused the write to fail is propagated and the write result will be
-     * discarded.
-     *
-     * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-     * deserialized in the bundle-writing and finalization phases, any state change to the
-     * WriteOperation object that occurs during initialization is visible in the latter phases.
-     * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
-     * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-     * WriteOperation).
-     */
-    private <WriteT> PDone createWrite(
-        PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-      Pipeline p = input.getPipeline();
-
-      // A coder to use for the WriteOperation.
-      @SuppressWarnings("unchecked")
-      Coder<WriteOperation<T, WriteT>> operationCoder =
-          (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
-      // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
-      // the sink.
-      PCollection<WriteOperation<T, WriteT>> operationCollection =
-          p.apply(Create.of(writeOperation).withCoder(operationCoder));
-
-      // Initialize the resource in a do-once ParDo on the WriteOperation.
-      operationCollection = operationCollection
-          .apply("Initialize", ParDo.of(
-              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Initializing write operation {}", writeOperation);
-              writeOperation.initialize(c.getPipelineOptions());
-              LOG.debug("Done initializing write operation {}", writeOperation);
-              // The WriteOperation is also the output of this ParDo, so it can have mutable
-              // state.
-              c.output(writeOperation);
-            }
-          }))
-          .setCoder(operationCoder);
-
-      // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
-      final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-          operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
-      // Re-window the data into the global window and remove any existing triggers.
-      PCollection<T> inputInGlobalWindow =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-
-      // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
-      // as a side input) and collect the results of the writes in a PCollection.
-      // There is a dependency between this ParDo and the first (the WriteOperation PCollection
-      // as a side input), so this will happen after the initial ParDo.
-      PCollection<WriteT> results;
-      final PCollectionView<Integer> numShards;
-      if (computeNumShards == null) {
-        numShards = null;
-        results =
-            inputInGlobalWindow.apply(
-                "WriteBundles",
-                ParDo.of(new WriteBundles<>(writeOperationView))
-                    .withSideInputs(writeOperationView));
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      Integer shardCount = context.sideInput(numShards);
+      checkArgument(
+          shardCount > 0,
+          "Must have a positive number of shards specified for non-runner-determined sharding."
+              + " Got %s",
+          shardCount);
+      if (shardNumber == -1) {
+        // We want to desynchronize the first record sharding key for each instance of
+        // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
       } else {
-        numShards = inputInGlobalWindow.apply(computeNumShards);
-        results =
-            inputInGlobalWindow
-                .apply(
-                    "ApplyShardLabel",
-                    ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
-                .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-                .apply(
-                    "WriteShardedBundles",
-                    ParDo.of(new WriteShardedBundles<>(writeOperationView))
-                        .withSideInputs(writeOperationView));
-      }
-      results.setCoder(writeOperation.getWriterResultCoder());
-
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
-      // collection as a side input), so it will happen after the parallel write.
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShards != null) {
-        sideInputs.add(numShards);
+        shardNumber = (shardNumber + 1) % shardCount;
       }
-      operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor user-specified numShards if set.
-              int minShardsNeeded;
-              if (numShards == null) {
-                minShardsNeeded = 1;
-              } else {
-                minShardsNeeded = c.sideInput(numShards);
-                checkArgument(
-                    minShardsNeeded > 0,
-                    "Must have a positive number of shards for non-runner-determined sharding."
-                        + " Got %s",
-                    minShardsNeeded);
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written for a total of {}.",
-                    extraShardsNeeded, results.size(), minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                  writer.open(UUID.randomUUID().toString());
-                  WriteT emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
+      context.output(KV.of(shardNumber, context.element()));
+    }
+  }
 
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
-      return PDone.in(input.getPipeline());
+  /**
+   * A write is performed as sequence of three {@link ParDo}'s.
+   *
+   * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
+   * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
+   * called. The output of this ParDo is a singleton PCollection
+   * containing the WriteOperation.
+   *
+   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+   * ParDo over the PCollection of elements to write. In this bundle-writing phase,
+   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and
+   * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for
+   * every element in the bundle. The output of this ParDo is a PCollection of
+   * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+   * each bundle.
+   *
+   * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
+   * the collection of writer results as a side-input. In this ParDo,
+   * {@link WriteOperation#finalize} is called to finalize the write.
+   *
+   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+   * before the exception that caused the write to fail is propagated and the write result will be
+   * discarded.
+   *
+   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
+   * deserialized in the bundle-writing and finalization phases, any state change to the
+   * WriteOperation object that occurs during initialization is visible in the latter phases.
+   * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
+   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+   * WriteOperation).
+   */
+  private <WriteT> PDone createWrite(
+      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
+    Pipeline p = input.getPipeline();
+
+    // A coder to use for the WriteOperation.
+    @SuppressWarnings("unchecked")
+    Coder<WriteOperation<T, WriteT>> operationCoder =
+        (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
+
+    // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
+    // the sink.
+    PCollection<WriteOperation<T, WriteT>> operationCollection =
+        p.apply(Create.of(writeOperation).withCoder(operationCoder));
+
+    // Initialize the resource in a do-once ParDo on the WriteOperation.
+    operationCollection = operationCollection
+        .apply("Initialize", ParDo.of(
+            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            WriteOperation<T, WriteT> writeOperation = c.element();
+            LOG.info("Initializing write operation {}", writeOperation);
+            writeOperation.initialize(c.getPipelineOptions());
+            LOG.debug("Done initializing write operation {}", writeOperation);
+            // The WriteOperation is also the output of this ParDo, so it can have mutable
+            // state.
+            c.output(writeOperation);
+          }
+        }))
+        .setCoder(operationCoder);
+
+    // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
+    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
+        operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
+
+    // Re-window the data into the global window and remove any existing triggers.
+    PCollection<T> inputInGlobalWindow =
+        input.apply(
+            Window.<T>into(new GlobalWindows())
+                .triggering(DefaultTrigger.of())
+                .discardingFiredPanes());
+
+    // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
+    // as a side input) and collect the results of the writes in a PCollection.
+    // There is a dependency between this ParDo and the first (the WriteOperation PCollection
+    // as a side input), so this will happen after the initial ParDo.
+    PCollection<WriteT> results;
+    final PCollectionView<Integer> numShards;
+    if (computeNumShards == null) {
+      numShards = null;
+      results =
+          inputInGlobalWindow.apply(
+              "WriteBundles",
+              ParDo.of(new WriteBundles<>(writeOperationView))
+                  .withSideInputs(writeOperationView));
+    } else {
+      numShards = inputInGlobalWindow.apply(computeNumShards);
+      results =
+          inputInGlobalWindow
+              .apply(
+                  "ApplyShardLabel",
+                  ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
+              .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+              .apply(
+                  "WriteShardedBundles",
+                  ParDo.of(new WriteShardedBundles<>(writeOperationView))
+                      .withSideInputs(writeOperationView));
     }
+    results.setCoder(writeOperation.getWriterResultCoder());
+
+    final PCollectionView<Iterable<WriteT>> resultsView =
+        results.apply(View.<WriteT>asIterable());
+
+    // Finalize the write in another do-once ParDo on the singleton collection containing the
+    // Writer. The results from the per-bundle writes are given as an Iterable side input.
+    // The WriteOperation's state is the same as after its initialization in the first do-once
+    // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
+    // collection as a side input), so it will happen after the parallel write.
+    ImmutableList.Builder<PCollectionView<?>> sideInputs =
+        ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+    if (numShards != null) {
+      sideInputs.add(numShards);
+    }
+    operationCollection
+        .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            WriteOperation<T, WriteT> writeOperation = c.element();
+            LOG.info("Finalizing write operation {}.", writeOperation);
+            List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+            LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+            // We must always output at least 1 shard, and honor user-specified numShards if set.
+            int minShardsNeeded;
+            if (numShards == null) {
+              minShardsNeeded = 1;
+            } else {
+              minShardsNeeded = c.sideInput(numShards);
+              checkArgument(
+                  minShardsNeeded > 0,
+                  "Must have a positive number of shards for non-runner-determined sharding."
+                      + " Got %s",
+                  minShardsNeeded);
+            }
+            int extraShardsNeeded = minShardsNeeded - results.size();
+            if (extraShardsNeeded > 0) {
+              LOG.info(
+                  "Creating {} empty output shards in addition to {} written for a total of {}.",
+                  extraShardsNeeded, results.size(), minShardsNeeded);
+              for (int i = 0; i < extraShardsNeeded; ++i) {
+                Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                writer.open(UUID.randomUUID().toString());
+                WriteT emptyWrite = writer.close();
+                results.add(emptyWrite);
+              }
+              LOG.debug("Done creating extra shards.");
+            }
+
+            writeOperation.finalize(results, c.getPipelineOptions());
+            LOG.debug("Done finalizing write operation {}", writeOperation);
+          }
+        }).withSideInputs(sideInputs.build()));
+    return PDone.in(input.getPipeline());
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index fd349e2..80f6f66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -210,7 +210,7 @@ public class WriteTest {
     }
 
     TestSink sink = new TestSink();
-    Write.Bound<String> write = Write.to(sink).withSharding(new LargestInt());
+    Write<String> write = Write.to(sink).withSharding(new LargestInt());
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
         .apply(write);
@@ -307,7 +307,7 @@ public class WriteTest {
   @Test
   public void testBuildWrite() {
     Sink<String> sink = new TestSink() {};
-    Write.Bound<String> write = Write.to(sink).withNumShards(3);
+    Write<String> write = Write.to(sink).withNumShards(3);
     assertThat(write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
         write.getSharding();
@@ -315,12 +315,12 @@ public class WriteTest {
     assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3));
     assertThat(write.getSharding(), equalTo(originalSharding));
 
-    Write.Bound<String> write2 = write.withSharding(SHARDING_TRANSFORM);
+    Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
     assertThat(write2.getSink(), is(sink));
     assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
     // original unchanged
 
-    Write.Bound<String> writeUnsharded = write2.withRunnerDeterminedSharding();
+    Write<String> writeUnsharded = write2.withRunnerDeterminedSharding();
     assertThat(writeUnsharded.getSharding(), nullValue());
     assertThat(write.getSharding(), equalTo(originalSharding));
   }
@@ -333,7 +333,7 @@ public class WriteTest {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Write.Bound<String> write = Write.to(sink);
+    Write<String> write = Write.to(sink);
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
@@ -348,7 +348,7 @@ public class WriteTest {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Write.Bound<String> write = Write.to(sink).withNumShards(1);
+    Write<String> write = Write.to(sink).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -363,7 +363,7 @@ public class WriteTest {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Write.Bound<String> write =
+    Write<String> write =
         Write.to(sink)
             .withSharding(
                 new PTransform<PCollection<String>, PCollectionView<Integer>>() {
@@ -433,7 +433,7 @@ public class WriteTest {
     }
 
     TestSink sink = new TestSink();
-    Write.Bound<String> write = Write.to(sink);
+    Write<String> write = Write.to(sink);
     if (numConfiguredShards.isPresent()) {
       write = write.withNumShards(numConfiguredShards.get());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 1f9beb3..53bc114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -139,7 +139,7 @@ public class TransformTreeTest {
           assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
-        } else if (transform instanceof Write.Bound) {
+        } else if (transform instanceof Write) {
           assertTrue(visited.add(TransformsSeen.WRITE));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
@@ -161,7 +161,7 @@ public class TransformTreeTest {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Combine.Globally.class)));
-        assertThat(transform, not(instanceOf(Write.Bound.class)));
+        assertThat(transform, not(instanceOf(Write.class)));
         if (transform instanceof Read.Bounded
             && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
           assertTrue(visited.add(TransformsSeen.READ));


[5/9] beam git commit: BEAM-1424 ToString should comply with PTransform style guide

Posted by dh...@apache.org.
BEAM-1424 ToString should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f34caa7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f34caa7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f34caa7

Branch: refs/heads/master
Commit: 4f34caa72182a0ae10df7b43c820195280c22c0c
Parents: a8e2387
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:34:31 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ToString.java    | 109 ++++++++-----------
 .../beam/sdk/transforms/ToStringTest.java       |   8 +-
 2 files changed, 50 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f34caa7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index 5069a3c..e5a8d38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -18,8 +18,7 @@
 
 package org.apache.beam.sdk.transforms;
 
-import java.util.Iterator;
-
+import com.google.common.base.Joiner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -38,74 +37,70 @@ public final class ToString {
   }
 
   /**
-   * Returns a {@code PTransform<PCollection, PCollection<String>>} which transforms each
-   * element of the input {@link PCollection} to a {@link String} using the
-   * {@link Object#toString} method.
+   * Transforms each element of the input {@link PCollection} to a {@link String} using the {@link
+   * Object#toString} method.
    */
   public static PTransform<PCollection<?>, PCollection<String>> elements() {
-    return new SimpleToString();
+    return new Elements();
   }
 
   /**
-   * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
-   * element of the input {@link PCollection} to a {@link String} by using the
+   * Transforms each element of the input {@link PCollection} to a {@link String} by using the
    * {@link Object#toString} on the key followed by a "," followed by the {@link Object#toString}
    * of the value.
    */
-  public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kv() {
-    return kv(",");
+  public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kvs() {
+    return kvs(",");
   }
 
   /**
-   * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
-   * element of the input {@link PCollection} to a {@link String} by using the
-   * {@link Object#toString} on the key followed by the specified delimeter followed by the
-   * {@link Object#toString} of the value.
+   * Transforms each element of the input {@link PCollection} to a {@link String} by using the
+   * {@link Object#toString} on the key followed by the specified delimiter followed by the {@link
+   * Object#toString} of the value.
+   *
    * @param delimiter The delimiter to put between the key and value
    */
-  public static PTransform<PCollection<? extends KV<?, ?>>,
-          PCollection<String>> kv(String delimiter) {
-    return new KVToString(delimiter);
+  public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kvs(
+      String delimiter) {
+    return new KVs(delimiter);
   }
 
   /**
-   * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
-   * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
+   * Transforms each item in the iterable of the input {@link PCollection} to a {@link String}
    * using the {@link Object#toString} method followed by a "," until
    * the last element in the iterable. There is no trailing delimiter.
    */
-  public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterable() {
-    return iterable(",");
+  public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterables() {
+    return iterables(",");
   }
 
   /**
-   * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
-   * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
-   * using the {@link Object#toString} method followed by the specified delimiter until
-   * the last element in the iterable. There is no trailing delimiter.
+   * Transforms each item in the iterable of the input {@link PCollection} to a {@link String} using
+   * the {@link Object#toString} method followed by the specified delimiter until the last element
+   * in the iterable. There is no trailing delimiter.
+   *
    * @param delimiter The delimiter to put between the items in the iterable.
    */
-  public static PTransform<PCollection<? extends Iterable<?>>,
-          PCollection<String>> iterable(String delimiter) {
-    return new IterablesToString(delimiter);
+  public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterables(
+      String delimiter) {
+    return new Iterables(delimiter);
   }
 
   /**
-   * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>}
-   * using the {@link  Object#toString} method.
+   * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>} using
+   * the {@link Object#toString} method.
    *
    * <p>Example of use:
+   *
    * <pre>{@code
    * PCollection<Long> longs = ...;
    * PCollection<String> strings = longs.apply(ToString.elements());
    * }</pre>
    *
-   *
    * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
    * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
    */
-  private static final class SimpleToString extends
-          PTransform<PCollection<?>, PCollection<String>> {
+  private static final class Elements extends PTransform<PCollection<?>, PCollection<String>> {
     @Override
     public PCollection<String> expand(PCollection<?> input) {
       return input.apply(MapElements.via(new SimpleFunction<Object, String>() {
@@ -118,25 +113,25 @@ public final class ToString {
   }
 
   /**
-   * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a
-   * {@code PCollection<String>} using the {@link  Object#toString} method for
-   * the key and value and an optional delimiter.
+   * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a {@code
+   * PCollection<String>} using the {@link Object#toString} method for the key and value and an
+   * optional delimiter.
    *
    * <p>Example of use:
+   *
    * <pre>{@code
    * PCollection<KV<String, Long>> nameToLong = ...;
    * PCollection<String> strings = nameToLong.apply(ToString.kv());
    * }</pre>
    *
-   *
-   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
-   * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
+   * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
    */
-  private static final class KVToString extends
-          PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
+  private static final class KVs
+      extends PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
     private final String delimiter;
 
-    public KVToString(String delimiter) {
+    public KVs(String delimiter) {
       this.delimiter = delimiter;
     }
 
@@ -152,25 +147,24 @@ public final class ToString {
   }
 
   /**
-   * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a
-   * {@code PCollection<String>} using the {@link  Object#toString} method and
-   * an optional delimiter.
+   * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a {@code
+   * PCollection<String>} using the {@link Object#toString} method and an optional delimiter.
    *
    * <p>Example of use:
+   *
    * <pre>{@code
    * PCollection<Iterable<Long>> longs = ...;
    * PCollection<String> strings = nameToLong.apply(ToString.iterable());
    * }</pre>
    *
-   *
-   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
-   * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
+   * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
    */
-  private static final class IterablesToString extends
-          PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
+  private static final class Iterables
+      extends PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
     private final String delimiter;
 
-    public IterablesToString(String delimiter) {
+    public Iterables(String delimiter) {
       this.delimiter = delimiter;
     }
 
@@ -179,18 +173,7 @@ public final class ToString {
       return input.apply(MapElements.via(new SimpleFunction<Iterable<?>, String>() {
         @Override
         public String apply(Iterable<?> input) {
-          StringBuilder builder = new StringBuilder();
-          Iterator iterator = input.iterator();
-
-          while (iterator.hasNext()) {
-            builder.append(iterator.next().toString());
-
-            if (iterator.hasNext()) {
-              builder.append(delimiter);
-            }
-          }
-
-          return builder.toString();
+          return Joiner.on(delimiter).join(input);
         }
       }));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f34caa7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
index d2116da..ab446a4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -65,7 +65,7 @@ public class ToStringTest {
     expected.add("two,2");
 
     PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
-    PCollection<String> output = input.apply(ToString.kv());
+    PCollection<String> output = input.apply(ToString.kvs());
     PAssert.that(output).containsInAnyOrder(expected);
     p.run();
   }
@@ -82,7 +82,7 @@ public class ToStringTest {
     expected.add("two\t2");
 
     PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
-    PCollection<String> output = input.apply(ToString.kv("\t"));
+    PCollection<String> output = input.apply(ToString.kvs("\t"));
     PAssert.that(output).containsInAnyOrder(expected);
     p.run();
   }
@@ -100,7 +100,7 @@ public class ToStringTest {
 
     PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
             .withCoder(IterableCoder.of(StringUtf8Coder.of())));
-    PCollection<String> output = input.apply(ToString.iterable());
+    PCollection<String> output = input.apply(ToString.iterables());
     PAssert.that(output).containsInAnyOrder(expected);
     p.run();
   }
@@ -118,7 +118,7 @@ public class ToStringTest {
 
     PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
             .withCoder(IterableCoder.of(StringUtf8Coder.of())));
-    PCollection<String> output = input.apply(ToString.iterable("\t"));
+    PCollection<String> output = input.apply(ToString.iterables("\t"));
     PAssert.that(output).containsInAnyOrder(expected);
     p.run();
   }