You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by je...@apache.org on 2016/11/02 02:41:52 UTC

[1/2] incubator-beam git commit: BEAM-239: Refactored the project to replace RemoveDuplicates to Distinct

Repository: incubator-beam
Updated Branches:
  refs/heads/master facf096e5 -> 8883877ae


BEAM-239: Refactored the project to replace RemoveDuplicates to Distinct


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8183ac82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8183ac82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8183ac82

Branch: refs/heads/master
Commit: 8183ac825d506a56421f41adc2e25b544f1bb80f
Parents: facf096
Author: Neelesh Srinivas Salian <ns...@cloudera.com>
Authored: Sat Oct 22 13:05:48 2016 -0700
Committer: Jesse Anderson <je...@smokinghand.com>
Committed: Tue Nov 1 19:15:12 2016 -0600

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |   6 +-
 .../beam/examples/cookbook/DeDupExample.java    |   8 +-
 .../org/apache/beam/examples/cookbook/README.md |   2 +-
 .../beam/examples/complete/TfIdfTest.java       |   4 +-
 .../examples/cookbook/DeDupExampleTest.java     |  82 ----------
 .../examples/cookbook/DistinctExampleTest.java  |  82 ++++++++++
 .../UnboundedReadFromBoundedSourceTest.java     |   4 +-
 .../beam/runners/flink/examples/TFIDF.java      |   6 +-
 .../streaming/KafkaStreamingTest.java           |   4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   4 +-
 .../apache/beam/sdk/transforms/Distinct.java    | 158 ++++++++++++++++++
 .../beam/sdk/transforms/RemoveDuplicates.java   | 159 -------------------
 .../apache/beam/sdk/io/CountingInputTest.java   |   6 +-
 .../apache/beam/sdk/io/CountingSourceTest.java  |   8 +-
 .../sdk/transforms/ApproximateUniqueTest.java   |   2 +-
 .../beam/sdk/transforms/DistinctTest.java       | 130 +++++++++++++++
 .../sdk/transforms/RemoveDuplicatesTest.java    | 130 ---------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |   6 +-
 .../beam/sdk/transforms/DistinctJava8Test.java  | 104 ++++++++++++
 .../transforms/RemoveDuplicatesJava8Test.java   | 104 ------------
 20 files changed, 504 insertions(+), 505 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index c0ba1e9..edf48e7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -36,12 +36,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -209,7 +209,7 @@ public class TfIdf {
       final PCollectionView<Long> totalDocuments =
           uriToContent
           .apply("GetURIs", Keys.<URI>create())
-          .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+          .apply("DistinctDocs", Distinct.<URI>create())
           .apply(Count.<URI>globally())
           .apply(View.<Long>asSingleton());
 
@@ -238,7 +238,7 @@ public class TfIdf {
       // Compute a mapping from each word to the total
       // number of documents in which it appears.
       PCollection<KV<String, Long>> wordToDocCount = uriToWords
-          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+          .apply("DistinctWords", Distinct.<KV<URI, String>>create())
           .apply(Values.<String>create())
           .apply("CountDocs", Count.<String>perElement());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
index 0883815..34fb901 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -24,16 +24,16 @@ import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
 /**
  * This example uses as input Shakespeare's plays as plaintext files, and will remove any
  * duplicate lines across all the files. (The output does not preserve any input order).
  *
- * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together.
+ * <p>Concepts: the Distinct transform, and how to wire transforms together.
  * Demonstrates {@link org.apache.beam.sdk.io.TextIO.Read}/
- * {@link RemoveDuplicates}/{@link org.apache.beam.sdk.io.TextIO.Write}.
+ * {@link Distinct}/{@link org.apache.beam.sdk.io.TextIO.Write}.
  *
  * <p>To execute this pipeline locally, specify a local output file or output prefix on GCS:
  *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
@@ -88,7 +88,7 @@ public class DeDupExample {
     Pipeline p = Pipeline.create(options);
 
     p.apply("ReadLines", TextIO.Read.from(options.getInput()))
-     .apply(RemoveDuplicates.<String>create())
+     .apply(Distinct.<String>create())
      .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
 
     p.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
index 77b72f3..2edd26b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
@@ -42,7 +42,7 @@ larger Dataflow pipeline. They include:
   <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a>
   &mdash; An example that uses Shakespeare's plays as plain text files, and
   removes duplicate lines across all the files. Demonstrates the
-  <code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
+  <code>Distinct</code>, <code>TextIO.Read</code>,
   and <code>TextIO.Write</code> transforms, and how to wire transforms together.
   </li>
   <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index 1aee8f9..fded4c0 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class TfIdfTest {
 
     PCollection<String> words = wordToUriAndTfIdf
         .apply(Keys.<String>create())
-        .apply(RemoveDuplicates.<String>create());
+        .apply(Distinct.<String>create());
 
     PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
deleted file mode 100644
index d29fc42..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.cookbook;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link DeDupExample}. */
-@RunWith(JUnit4.class)
-public class DeDupExampleTest {
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testRemoveDuplicates() {
-    List<String> strings = Arrays.asList(
-        "k1",
-        "k5",
-        "k5",
-        "k2",
-        "k1",
-        "k2",
-        "k3");
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> input =
-        p.apply(Create.of(strings)
-            .withCoder(StringUtf8Coder.of()));
-
-    PCollection<String> output =
-        input.apply(RemoveDuplicates.<String>create());
-
-    PAssert.that(output)
-        .containsInAnyOrder("k1", "k5", "k2", "k3");
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testRemoveDuplicatesEmpty() {
-    List<String> strings = Arrays.asList();
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> input =
-        p.apply(Create.of(strings)
-            .withCoder(StringUtf8Coder.of()));
-
-    PCollection<String> output =
-        input.apply(RemoveDuplicates.<String>create());
-
-    PAssert.that(output).empty();
-    p.run().waitUntilFinish();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
new file mode 100644
index 0000000..6fa66f1
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link Distinct}. */
+@RunWith(JUnit4.class)
+public class DistinctExampleTest {
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDistinct() {
+    List<String> strings = Arrays.asList(
+        "k1",
+        "k5",
+        "k5",
+        "k2",
+        "k1",
+        "k2",
+        "k3");
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input =
+        p.apply(Create.of(strings)
+            .withCoder(StringUtf8Coder.of()));
+
+    PCollection<String> output =
+        input.apply(Distinct.<String>create());
+
+    PAssert.that(output)
+        .containsInAnyOrder("k1", "k5", "k2", "k3");
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDistinctEmpty() {
+    List<String> strings = Arrays.asList();
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input =
+        p.apply(Create.of(strings)
+            .withCoder(StringUtf8Coder.of()));
+
+    PCollection<String> output =
+        input.apply(Distinct.<String>create());
+
+    PAssert.that(output).empty();
+    p.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index efc446e..7fd8807 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSourceTest {
       .isEqualTo(numElements);
     // Unique count == numElements
     PAssert
-      .thatSingleton(output.apply(RemoveDuplicates.<Long>create())
+      .thatSingleton(output.apply(Distinct.<Long>create())
                           .apply("UniqueCount", Count.<Long>globally()))
       .isEqualTo(numElements);
     // Min == 0

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 6d04e0b..cf5c8f5 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -38,12 +38,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -221,7 +221,7 @@ public class TFIDF {
       final PCollectionView<Long> totalDocuments =
           uriToContent
               .apply("GetURIs", Keys.<URI>create())
-              .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+              .apply("DistinctDocs", Distinct.<URI>create())
               .apply(Count.<URI>globally())
               .apply(View.<Long>asSingleton());
 
@@ -251,7 +251,7 @@ public class TFIDF {
       // Compute a mapping from each word to the total
       // number of documents in which it appears.
       PCollection<KV<String, Long>> wordToDocCount = uriToWords
-          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+          .apply("DistinctWords", Distinct.<KV<URI, String>>create())
           .apply(Values.<String>create())
           .apply("CountDocs", Count.<String>perElement());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 8728948..fe2d04e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -32,9 +32,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -115,7 +115,7 @@ public class KafkaStreamingTest {
             KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
         .apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
         .apply(ParDo.of(new FormatKVFn()))
-        .apply(RemoveDuplicates.<String>create());
+        .apply(Distinct.<String>create());
 
     PAssertStreaming.runAndAssertContents(p, deduped, expected);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 40c52a2..123dca8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -29,9 +29,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FluentBackoff;
@@ -88,7 +88,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
         Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
     if (source.requiresDeduping()) {
-      read = read.apply(RemoveDuplicates.withRepresentativeValueFn(
+      read = read.apply(Distinct.withRepresentativeValueFn(
           new SerializableFunction<ValueWithRecordId<T>, byte[]>() {
             @Override
             public byte[] apply(ValueWithRecordId<T> input) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
new file mode 100644
index 0000000..fba428b
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * {@code Distinct<T>} takes a {@code PCollection<T>} and
+ * returns a {@code PCollection<T>} that has all distinct elements of the
+ * input. Thus, each element is unique within each window.
+ *
+ * <p>Two values of type {@code T} are compared for equality <b>not</b> by
+ * regular Java {@link Object#equals}, but instead by first encoding
+ * each of the elements using the {@code PCollection}'s {@code Coder}, and then
+ * comparing the encoded bytes.  This admits efficient parallel
+ * evaluation.
+ *
+ * <p>Optionally, a function may be provided that maps each element to a representative
+ * value.  In this case, two elements will be considered duplicates if they have equal
+ * representative values, with equality being determined as above.
+ *
+ * <p>By default, the {@code Coder} of the output {@code PCollection}
+ * is the same as the {@code Coder} of the input {@code PCollection}.
+ *
+ * <p>Each output element is in the same window as its corresponding input
+ * element, and has the timestamp of the end of that window.  The output
+ * {@code PCollection} has the same
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * as the input.
+ *
+ * <p>Does not preserve any order the input PCollection might have had.
+ *
+ * <p>Example of use:
+ * <pre> {@code
+ * PCollection<String> words = ...;
+ * PCollection<String> uniqueWords =
+ *     words.apply(Distinct.<String>create());
+ * } </pre>
+ *
+ * @param <T> the type of the elements of the input and output
+ * {@code PCollection}s
+ */
+public class Distinct<T> extends PTransform<PCollection<T>,
+                                                    PCollection<T>> {
+  /**
+   * Returns a {@code Distinct<T>} {@code PTransform}.
+   *
+   * @param <T> the type of the elements of the input and output
+   * {@code PCollection}s
+   */
+  public static <T> Distinct<T> create() {
+    return new Distinct<T>();
+  }
+
+  /**
+   * Returns a {@code Distinct<T, IdT>} {@code PTransform}.
+   *
+   * @param <T> the type of the elements of the input and output
+   * {@code PCollection}s
+   * @param <IdT> the type of the representative value used to dedup
+   */
+  public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
+      SerializableFunction<T, IdT> fn) {
+    return new WithRepresentativeValues<T, IdT>(fn, null);
+  }
+
+  @Override
+  public PCollection<T> apply(PCollection<T> in) {
+    return in
+        .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
+          @Override
+          public KV<T, Void> apply(T element) {
+            return KV.of(element, (Void) null);
+          }
+        }))
+        .apply(Combine.<T, Void>perKey(
+            new SerializableFunction<Iterable<Void>, Void>() {
+              @Override
+              public Void apply(Iterable<Void> iter) {
+                return null; // ignore input
+                }
+            }))
+        .apply(Keys.<T>create());
+  }
+
+  /**
+   * A {@link Distinct} {@link PTransform} that uses a {@link SerializableFunction} to
+   * obtain a representative value for each input element.
+   *
+   * <p>Construct via {@link Distinct#withRepresentativeValueFn(SerializableFunction)}.
+   *
+   * @param <T> the type of input and output element
+   * @param <IdT> the type of representative values used to dedup
+   */
+  public static class WithRepresentativeValues<T, IdT>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    private final SerializableFunction<T, IdT> fn;
+    private final TypeDescriptor<IdT> representativeType;
+
+    private WithRepresentativeValues(
+        SerializableFunction<T, IdT> fn, TypeDescriptor<IdT> representativeType) {
+      this.fn = fn;
+      this.representativeType = representativeType;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> in) {
+      WithKeys<IdT, T> withKeys = WithKeys.of(fn);
+      if (representativeType != null) {
+        withKeys = withKeys.withKeyType(representativeType);
+      }
+      return in
+          .apply(withKeys)
+          .apply(Combine.<IdT, T, T>perKey(
+              new Combine.BinaryCombineFn<T>() {
+                @Override
+                public T apply(T left, T right) {
+                  return left;
+                }
+              }))
+          .apply(Values.<T>create());
+    }
+
+    /**
+     * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     * the specified output type descriptor.
+     *
+     * <p>Required for use of
+     * {@link Distinct#withRepresentativeValueFn(SerializableFunction)}
+     * in Java 8 with a lambda as the fn.
+     *
+     * @param type a {@link TypeDescriptor} describing the representative type of this
+     *             {@code WithRepresentativeValues}
+     * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     *         the specified output type descriptor.
+     */
+    public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> type) {
+      return new WithRepresentativeValues<>(fn, type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
deleted file mode 100644
index 709aa4a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * {@code RemoveDuplicates<T>} takes a {@code PCollection<T>} and
- * returns a {@code PCollection<T>} that has all the elements of the
- * input but with duplicate elements removed such that each element is
- * unique within each window.
- *
- * <p>Two values of type {@code T} are compared for equality <b>not</b> by
- * regular Java {@link Object#equals}, but instead by first encoding
- * each of the elements using the {@code PCollection}'s {@code Coder}, and then
- * comparing the encoded bytes.  This admits efficient parallel
- * evaluation.
- *
- * <p>Optionally, a function may be provided that maps each element to a representative
- * value.  In this case, two elements will be considered duplicates if they have equal
- * representative values, with equality being determined as above.
- *
- * <p>By default, the {@code Coder} of the output {@code PCollection}
- * is the same as the {@code Coder} of the input {@code PCollection}.
- *
- * <p>Each output element is in the same window as its corresponding input
- * element, and has the timestamp of the end of that window.  The output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * as the input.
- *
- * <p>Does not preserve any order the input PCollection might have had.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String> words = ...;
- * PCollection<String> uniqueWords =
- *     words.apply(RemoveDuplicates.<String>create());
- * } </pre>
- *
- * @param <T> the type of the elements of the input and output
- * {@code PCollection}s
- */
-public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
-                                                    PCollection<T>> {
-  /**
-   * Returns a {@code RemoveDuplicates<T>} {@code PTransform}.
-   *
-   * @param <T> the type of the elements of the input and output
-   * {@code PCollection}s
-   */
-  public static <T> RemoveDuplicates<T> create() {
-    return new RemoveDuplicates<T>();
-  }
-
-  /**
-   * Returns a {@code RemoveDuplicates<T, IdT>} {@code PTransform}.
-   *
-   * @param <T> the type of the elements of the input and output
-   * {@code PCollection}s
-   * @param <IdT> the type of the representative value used to dedup
-   */
-  public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
-      SerializableFunction<T, IdT> fn) {
-    return new WithRepresentativeValues<T, IdT>(fn, null);
-  }
-
-  @Override
-  public PCollection<T> apply(PCollection<T> in) {
-    return in
-        .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
-          @Override
-          public KV<T, Void> apply(T element) {
-            return KV.of(element, (Void) null);
-          }
-        }))
-        .apply(Combine.<T, Void>perKey(
-            new SerializableFunction<Iterable<Void>, Void>() {
-              @Override
-              public Void apply(Iterable<Void> iter) {
-                return null; // ignore input
-                }
-            }))
-        .apply(Keys.<T>create());
-  }
-
-  /**
-   * A {@link RemoveDuplicates} {@link PTransform} that uses a {@link SerializableFunction} to
-   * obtain a representative value for each input element.
-   *
-   * <p>Construct via {@link RemoveDuplicates#withRepresentativeValueFn(SerializableFunction)}.
-   *
-   * @param <T> the type of input and output element
-   * @param <IdT> the type of representative values used to dedup
-   */
-  public static class WithRepresentativeValues<T, IdT>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-    private final SerializableFunction<T, IdT> fn;
-    private final TypeDescriptor<IdT> representativeType;
-
-    private WithRepresentativeValues(
-        SerializableFunction<T, IdT> fn, TypeDescriptor<IdT> representativeType) {
-      this.fn = fn;
-      this.representativeType = representativeType;
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> in) {
-      WithKeys<IdT, T> withKeys = WithKeys.of(fn);
-      if (representativeType != null) {
-        withKeys = withKeys.withKeyType(representativeType);
-      }
-      return in
-          .apply(withKeys)
-          .apply(Combine.<IdT, T, T>perKey(
-              new Combine.BinaryCombineFn<T>() {
-                @Override
-                public T apply(T left, T right) {
-                  return left;
-                }
-              }))
-          .apply(Values.<T>create());
-    }
-
-    /**
-     * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
-     * the specified output type descriptor.
-     *
-     * <p>Required for use of
-     * {@link RemoveDuplicates#withRepresentativeValueFn(SerializableFunction)}
-     * in Java 8 with a lambda as the fn.
-     *
-     * @param type a {@link TypeDescriptor} describing the representative type of this
-     *             {@code WithRepresentativeValues}
-     * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
-     *         the specified output type descriptor.
-     */
-    public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> type) {
-      return new WithRepresentativeValues<>(fn, type);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 0b92b26..2397d10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -28,12 +28,12 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
@@ -56,7 +56,7 @@ public class CountingInputTest {
     // Unique count == numElements
     PAssert.thatSingleton(
             input
-                .apply(RemoveDuplicates.<Long>create())
+                .apply(Distinct.<Long>create())
                 .apply("UniqueCount", Count.<Long>globally()))
         .isEqualTo(numElements);
     // Min == 0
@@ -141,7 +141,7 @@ public class CountingInputTest {
     PCollection<Long> diffs =
         input
             .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-            .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+            .apply("DistinctTimestamps", Distinct.<Long>create());
     // This assert also confirms that diffs only has one unique value.
     PAssert.thatSingleton(diffs).isEqualTo(0L);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 7c5fa13..88c68d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -36,12 +36,12 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -66,7 +66,7 @@ public class CountingSourceTest {
       .isEqualTo(numElements);
     // Unique count == numElements
     PAssert
-      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+      .thatSingleton(input.apply(Distinct.<Long>create())
                           .apply("UniqueCount", Count.<Long>globally()))
       .isEqualTo(numElements);
     // Min == 0
@@ -177,7 +177,7 @@ public class CountingSourceTest {
 
     PCollection<Long> diffs = input
         .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+        .apply("DistinctTimestamps", Distinct.<Long>create());
     // This assert also confirms that diffs only has one unique value.
     PAssert.thatSingleton(diffs).isEqualTo(0L);
 
@@ -204,7 +204,7 @@ public class CountingSourceTest {
     PCollection<Long> diffs =
         input
             .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-            .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+            .apply("DistinctTimestamps", Distinct.<Long>create());
     // This assert also confirms that diffs only has one unique value.
     PAssert.thatSingleton(diffs).isEqualTo(0L);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index ba1ddfe..b63c73d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -214,7 +214,7 @@ public class ApproximateUniqueTest implements Serializable {
     PCollection<Long> approximate = input.apply(ApproximateUnique.<String>globally(sampleSize));
     final PCollectionView<Long> exact =
         input
-            .apply(RemoveDuplicates.<String>create())
+            .apply(Distinct.<String>create())
             .apply(Count.<String>globally())
             .apply(View.<Long>asSingleton());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
new file mode 100644
index 0000000..257b364
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Distinct}.
+ */
+@RunWith(JUnit4.class)
+public class DistinctTest {
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDistinct() {
+    List<String> strings = Arrays.asList(
+        "k1",
+        "k5",
+        "k5",
+        "k2",
+        "k1",
+        "k2",
+        "k3");
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input =
+        p.apply(Create.of(strings)
+            .withCoder(StringUtf8Coder.of()));
+
+    PCollection<String> output =
+        input.apply(Distinct.<String>create());
+
+    PAssert.that(output)
+        .containsInAnyOrder("k1", "k5", "k2", "k3");
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDistinctEmpty() {
+    List<String> strings = Arrays.asList();
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input =
+        p.apply(Create.of(strings)
+            .withCoder(StringUtf8Coder.of()));
+
+    PCollection<String> output =
+        input.apply(Distinct.<String>create());
+
+    PAssert.that(output).empty();
+    p.run();
+  }
+
+  private static class Keys implements SerializableFunction<KV<String, String>, String> {
+    @Override
+    public String apply(KV<String, String> input) {
+      return input.getKey();
+    }
+  }
+
+  private static class Checker implements SerializableFunction<Iterable<KV<String, String>>, Void> {
+    @Override
+    public Void apply(Iterable<KV<String, String>> input) {
+      Map<String, String> values = new HashMap<>();
+      for (KV<String, String> kv : input) {
+        values.put(kv.getKey(), kv.getValue());
+      }
+      assertEquals(2, values.size());
+      assertTrue(values.get("k1").equals("v1") || values.get("k1").equals("v2"));
+      assertEquals("v1", values.get("k2"));
+      return null;
+    }
+  }
+
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDistinctWithRepresentativeValue() {
+    List<KV<String, String>> strings = Arrays.asList(
+        KV.of("k1", "v1"),
+        KV.of("k1", "v2"),
+        KV.of("k2", "v1"));
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<KV<String, String>> input = p.apply(Create.of(strings));
+
+    PCollection<KV<String, String>> output =
+        input.apply(Distinct.withRepresentativeValueFn(new Keys()));
+
+
+    PAssert.that(output).satisfies(new Checker());
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java
deleted file mode 100644
index 312cba6..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for RemovedDuplicates.
- */
-@RunWith(JUnit4.class)
-public class RemoveDuplicatesTest {
-  @Test
-  @Category(RunnableOnService.class)
-  public void testRemoveDuplicates() {
-    List<String> strings = Arrays.asList(
-        "k1",
-        "k5",
-        "k5",
-        "k2",
-        "k1",
-        "k2",
-        "k3");
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> input =
-        p.apply(Create.of(strings)
-            .withCoder(StringUtf8Coder.of()));
-
-    PCollection<String> output =
-        input.apply(RemoveDuplicates.<String>create());
-
-    PAssert.that(output)
-        .containsInAnyOrder("k1", "k5", "k2", "k3");
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testRemoveDuplicatesEmpty() {
-    List<String> strings = Arrays.asList();
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> input =
-        p.apply(Create.of(strings)
-            .withCoder(StringUtf8Coder.of()));
-
-    PCollection<String> output =
-        input.apply(RemoveDuplicates.<String>create());
-
-    PAssert.that(output).empty();
-    p.run();
-  }
-
-  private static class Keys implements SerializableFunction<KV<String, String>, String> {
-    @Override
-    public String apply(KV<String, String> input) {
-      return input.getKey();
-    }
-  }
-
-  private static class Checker implements SerializableFunction<Iterable<KV<String, String>>, Void> {
-    @Override
-    public Void apply(Iterable<KV<String, String>> input) {
-      Map<String, String> values = new HashMap<>();
-      for (KV<String, String> kv : input) {
-        values.put(kv.getKey(), kv.getValue());
-      }
-      assertEquals(2, values.size());
-      assertTrue(values.get("k1").equals("v1") || values.get("k1").equals("v2"));
-      assertEquals("v1", values.get("k2"));
-      return null;
-    }
-  }
-
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testRemoveDuplicatesWithRepresentativeValue() {
-    List<KV<String, String>> strings = Arrays.asList(
-        KV.of("k1", "v1"),
-        KV.of("k1", "v2"),
-        KV.of("k2", "v1"));
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<KV<String, String>> input = p.apply(Create.of(strings));
-
-    PCollection<KV<String, String>> output =
-        input.apply(RemoveDuplicates.withRepresentativeValueFn(new Keys()));
-
-
-    PAssert.that(output).satisfies(new Checker());
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2f3c524..cc1ef26 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -252,7 +252,7 @@ public class KafkaIOTest {
       .isEqualTo(numElements);
     // Unique count == numElements
     PAssert
-      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+      .thatSingleton(input.apply(Distinct.<Long>create())
                           .apply("UniqueCount", Count.<Long>globally()))
       .isEqualTo(numElements);
     // Min == 0
@@ -333,7 +333,7 @@ public class KafkaIOTest {
 
     PCollection<Long> diffs = input
         .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+        .apply("DistinctTimestamps", Distinct.<Long>create());
     // This assert also confirms that diffs only has one unique value.
     PAssert.thatSingleton(diffs).isEqualTo(0L);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
new file mode 100644
index 0000000..99ef232
--- /dev/null
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Java 8 tests for {@link Distinct}.
+ */
+@RunWith(JUnit4.class)
+public class DistinctJava8Test {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() {
+    TestPipeline p = TestPipeline.create();
+
+    Multimap<Integer, String> predupedContents = HashMultimap.create();
+    predupedContents.put(3, "foo");
+    predupedContents.put(4, "foos");
+    predupedContents.put(6, "barbaz");
+    predupedContents.put(6, "bazbar");
+    PCollection<String> dupes =
+        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+    PCollection<String> deduped =
+        dupes.apply(Distinct.withRepresentativeValueFn((String s) -> s.length())
+                                    .withRepresentativeType(TypeDescriptor.of(Integer.class)));
+
+    PAssert.that(deduped).satisfies((Iterable<String> strs) -> {
+      Set<Integer> seenLengths = new HashSet<>();
+      for (String s : strs) {
+        assertThat(predupedContents.values(), hasItem(s));
+        assertThat(seenLengths, not(contains(s.length())));
+        seenLengths.add(s.length());
+      }
+      return null;
+    });
+
+    p.run();
+  }
+
+  @Test
+  public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
+    TestPipeline p = TestPipeline.create();
+
+    Multimap<Integer, String> predupedContents = HashMultimap.create();
+    predupedContents.put(3, "foo");
+    predupedContents.put(4, "foos");
+    predupedContents.put(6, "barbaz");
+    predupedContents.put(6, "bazbar");
+    PCollection<String> dupes =
+        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
+    thrown.expectMessage("No Coder has been manually specified");
+    thrown.expectMessage(
+        containsString("Building a Coder using a registered CoderFactory failed"));
+    thrown.expectMessage(
+        containsString("Building a Coder from the @DefaultCoder annotation failed"));
+    thrown.expectMessage(
+        containsString("Building a Coder from the fallback CoderProvider failed"));
+
+    // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
+    // implemented with
+    dupes.apply("RemoveRepresentativeDupes",
+        Distinct.withRepresentativeValueFn((String s) -> s.length()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java
deleted file mode 100644
index 99266d4..0000000
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Java 8 tests for {@link RemoveDuplicates}.
- */
-@RunWith(JUnit4.class)
-public class RemoveDuplicatesJava8Test {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() {
-    TestPipeline p = TestPipeline.create();
-
-    Multimap<Integer, String> predupedContents = HashMultimap.create();
-    predupedContents.put(3, "foo");
-    predupedContents.put(4, "foos");
-    predupedContents.put(6, "barbaz");
-    predupedContents.put(6, "bazbar");
-    PCollection<String> dupes =
-        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
-    PCollection<String> deduped =
-        dupes.apply(RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length())
-                                    .withRepresentativeType(TypeDescriptor.of(Integer.class)));
-
-    PAssert.that(deduped).satisfies((Iterable<String> strs) -> {
-      Set<Integer> seenLengths = new HashSet<>();
-      for (String s : strs) {
-        assertThat(predupedContents.values(), hasItem(s));
-        assertThat(seenLengths, not(contains(s.length())));
-        seenLengths.add(s.length());
-      }
-      return null;
-    });
-
-    p.run();
-  }
-
-  @Test
-  public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
-    TestPipeline p = TestPipeline.create();
-
-    Multimap<Integer, String> predupedContents = HashMultimap.create();
-    predupedContents.put(3, "foo");
-    predupedContents.put(4, "foos");
-    predupedContents.put(6, "barbaz");
-    predupedContents.put(6, "bazbar");
-    PCollection<String> dupes =
-        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
-    thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
-
-    // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
-    // implemented with
-    dupes.apply("RemoveRepresentativeDupes",
-        RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()));
-  }
-}


[2/2] incubator-beam git commit: This closes #1164

Posted by je...@apache.org.
This closes #1164


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8883877a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8883877a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8883877a

Branch: refs/heads/master
Commit: 8883877ae91fa42963181d419d8f3cbfea8b314b
Parents: facf096 8183ac8
Author: Jesse Anderson <je...@smokinghand.com>
Authored: Tue Nov 1 19:25:27 2016 -0600
Committer: Jesse Anderson <je...@smokinghand.com>
Committed: Tue Nov 1 19:25:27 2016 -0600

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |   6 +-
 .../beam/examples/cookbook/DeDupExample.java    |   8 +-
 .../org/apache/beam/examples/cookbook/README.md |   2 +-
 .../beam/examples/complete/TfIdfTest.java       |   4 +-
 .../examples/cookbook/DeDupExampleTest.java     |  82 ----------
 .../examples/cookbook/DistinctExampleTest.java  |  82 ++++++++++
 .../UnboundedReadFromBoundedSourceTest.java     |   4 +-
 .../beam/runners/flink/examples/TFIDF.java      |   6 +-
 .../streaming/KafkaStreamingTest.java           |   4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   4 +-
 .../apache/beam/sdk/transforms/Distinct.java    | 158 ++++++++++++++++++
 .../beam/sdk/transforms/RemoveDuplicates.java   | 159 -------------------
 .../apache/beam/sdk/io/CountingInputTest.java   |   6 +-
 .../apache/beam/sdk/io/CountingSourceTest.java  |   8 +-
 .../sdk/transforms/ApproximateUniqueTest.java   |   2 +-
 .../beam/sdk/transforms/DistinctTest.java       | 130 +++++++++++++++
 .../sdk/transforms/RemoveDuplicatesTest.java    | 130 ---------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |   6 +-
 .../beam/sdk/transforms/DistinctJava8Test.java  | 104 ++++++++++++
 .../transforms/RemoveDuplicatesJava8Test.java   | 104 ------------
 20 files changed, 504 insertions(+), 505 deletions(-)
----------------------------------------------------------------------