You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by je...@apache.org on 2016/11/02 02:41:52 UTC
[1/2] incubator-beam git commit: BEAM-239: Refactored the project to
replace RemoveDuplicates to Distinct
Repository: incubator-beam
Updated Branches:
refs/heads/master facf096e5 -> 8883877ae
BEAM-239: Refactored the project to replace RemoveDuplicates to Distinct
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8183ac82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8183ac82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8183ac82
Branch: refs/heads/master
Commit: 8183ac825d506a56421f41adc2e25b544f1bb80f
Parents: facf096
Author: Neelesh Srinivas Salian <ns...@cloudera.com>
Authored: Sat Oct 22 13:05:48 2016 -0700
Committer: Jesse Anderson <je...@smokinghand.com>
Committed: Tue Nov 1 19:15:12 2016 -0600
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 6 +-
.../beam/examples/cookbook/DeDupExample.java | 8 +-
.../org/apache/beam/examples/cookbook/README.md | 2 +-
.../beam/examples/complete/TfIdfTest.java | 4 +-
.../examples/cookbook/DeDupExampleTest.java | 82 ----------
.../examples/cookbook/DistinctExampleTest.java | 82 ++++++++++
.../UnboundedReadFromBoundedSourceTest.java | 4 +-
.../beam/runners/flink/examples/TFIDF.java | 6 +-
.../streaming/KafkaStreamingTest.java | 4 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 4 +-
.../apache/beam/sdk/transforms/Distinct.java | 158 ++++++++++++++++++
.../beam/sdk/transforms/RemoveDuplicates.java | 159 -------------------
.../apache/beam/sdk/io/CountingInputTest.java | 6 +-
.../apache/beam/sdk/io/CountingSourceTest.java | 8 +-
.../sdk/transforms/ApproximateUniqueTest.java | 2 +-
.../beam/sdk/transforms/DistinctTest.java | 130 +++++++++++++++
.../sdk/transforms/RemoveDuplicatesTest.java | 130 ---------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 6 +-
.../beam/sdk/transforms/DistinctJava8Test.java | 104 ++++++++++++
.../transforms/RemoveDuplicatesJava8Test.java | 104 ------------
20 files changed, 504 insertions(+), 505 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index c0ba1e9..edf48e7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -36,12 +36,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -209,7 +209,7 @@ public class TfIdf {
final PCollectionView<Long> totalDocuments =
uriToContent
.apply("GetURIs", Keys.<URI>create())
- .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+ .apply("DistinctDocs", Distinct.<URI>create())
.apply(Count.<URI>globally())
.apply(View.<Long>asSingleton());
@@ -238,7 +238,7 @@ public class TfIdf {
// Compute a mapping from each word to the total
// number of documents in which it appears.
PCollection<KV<String, Long>> wordToDocCount = uriToWords
- .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+ .apply("DistinctWords", Distinct.<KV<URI, String>>create())
.apply(Values.<String>create())
.apply("CountDocs", Count.<String>perElement());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
index 0883815..34fb901 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -24,16 +24,16 @@ import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
/**
* This example uses as input Shakespeare's plays as plaintext files, and will remove any
* duplicate lines across all the files. (The output does not preserve any input order).
*
- * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together.
+ * <p>Concepts: the Distinct transform, and how to wire transforms together.
* Demonstrates {@link org.apache.beam.sdk.io.TextIO.Read}/
- * {@link RemoveDuplicates}/{@link org.apache.beam.sdk.io.TextIO.Write}.
+ * {@link Distinct}/{@link org.apache.beam.sdk.io.TextIO.Write}.
*
* <p>To execute this pipeline locally, specify a local output file or output prefix on GCS:
* --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
@@ -88,7 +88,7 @@ public class DeDupExample {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.Read.from(options.getInput()))
- .apply(RemoveDuplicates.<String>create())
+ .apply(Distinct.<String>create())
.apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
index 77b72f3..2edd26b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
@@ -42,7 +42,7 @@ larger Dataflow pipeline. They include:
<li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a>
— An example that uses Shakespeare's plays as plain text files, and
removes duplicate lines across all the files. Demonstrates the
- <code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
+ <code>Distinct</code>, <code>TextIO.Read</code>,
and <code>TextIO.Write</code> transforms, and how to wire transforms together.
</li>
<li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index 1aee8f9..fded4c0 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class TfIdfTest {
PCollection<String> words = wordToUriAndTfIdf
.apply(Keys.<String>create())
- .apply(RemoveDuplicates.<String>create());
+ .apply(Distinct.<String>create());
PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
deleted file mode 100644
index d29fc42..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.cookbook;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link DeDupExample}. */
-@RunWith(JUnit4.class)
-public class DeDupExampleTest {
-
- @Test
- @Category(RunnableOnService.class)
- public void testRemoveDuplicates() {
- List<String> strings = Arrays.asList(
- "k1",
- "k5",
- "k5",
- "k2",
- "k1",
- "k2",
- "k3");
-
- Pipeline p = TestPipeline.create();
-
- PCollection<String> input =
- p.apply(Create.of(strings)
- .withCoder(StringUtf8Coder.of()));
-
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
-
- PAssert.that(output)
- .containsInAnyOrder("k1", "k5", "k2", "k3");
- p.run().waitUntilFinish();
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testRemoveDuplicatesEmpty() {
- List<String> strings = Arrays.asList();
-
- Pipeline p = TestPipeline.create();
-
- PCollection<String> input =
- p.apply(Create.of(strings)
- .withCoder(StringUtf8Coder.of()));
-
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
-
- PAssert.that(output).empty();
- p.run().waitUntilFinish();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
new file mode 100644
index 0000000..6fa66f1
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link Distinct}. */
+@RunWith(JUnit4.class)
+public class DistinctExampleTest {
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDistinct() {
+ List<String> strings = Arrays.asList(
+ "k1",
+ "k5",
+ "k5",
+ "k2",
+ "k1",
+ "k2",
+ "k3");
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input =
+ p.apply(Create.of(strings)
+ .withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output =
+ input.apply(Distinct.<String>create());
+
+ PAssert.that(output)
+ .containsInAnyOrder("k1", "k5", "k2", "k3");
+ p.run().waitUntilFinish();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDistinctEmpty() {
+ List<String> strings = Arrays.asList();
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input =
+ p.apply(Create.of(strings)
+ .withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output =
+ input.apply(Distinct.<String>create());
+
+ PAssert.that(output).empty();
+ p.run().waitUntilFinish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index efc446e..7fd8807 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSourceTest {
.isEqualTo(numElements);
// Unique count == numElements
PAssert
- .thatSingleton(output.apply(RemoveDuplicates.<Long>create())
+ .thatSingleton(output.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 6d04e0b..cf5c8f5 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -38,12 +38,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -221,7 +221,7 @@ public class TFIDF {
final PCollectionView<Long> totalDocuments =
uriToContent
.apply("GetURIs", Keys.<URI>create())
- .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+ .apply("DistinctDocs", Distinct.<URI>create())
.apply(Count.<URI>globally())
.apply(View.<Long>asSingleton());
@@ -251,7 +251,7 @@ public class TFIDF {
// Compute a mapping from each word to the total
// number of documents in which it appears.
PCollection<KV<String, Long>> wordToDocCount = uriToWords
- .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+ .apply("DistinctWords", Distinct.<KV<URI, String>>create())
.apply(Values.<String>create())
.apply("CountDocs", Count.<String>perElement());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 8728948..fe2d04e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -32,9 +32,9 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -115,7 +115,7 @@ public class KafkaStreamingTest {
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
.apply(ParDo.of(new FormatKVFn()))
- .apply(RemoveDuplicates.<String>create());
+ .apply(Distinct.<String>create());
PAssertStreaming.runAndAssertContents(p, deduped, expected);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 40c52a2..123dca8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -29,9 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FluentBackoff;
@@ -88,7 +88,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
if (source.requiresDeduping()) {
- read = read.apply(RemoveDuplicates.withRepresentativeValueFn(
+ read = read.apply(Distinct.withRepresentativeValueFn(
new SerializableFunction<ValueWithRecordId<T>, byte[]>() {
@Override
public byte[] apply(ValueWithRecordId<T> input) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
new file mode 100644
index 0000000..fba428b
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * {@code Distinct<T>} takes a {@code PCollection<T>} and
+ * returns a {@code PCollection<T>} that has all distinct elements of the
+ * input. Thus, each element is unique within each window.
+ *
+ * <p>Two values of type {@code T} are compared for equality <b>not</b> by
+ * regular Java {@link Object#equals}, but instead by first encoding
+ * each of the elements using the {@code PCollection}'s {@code Coder}, and then
+ * comparing the encoded bytes. This admits efficient parallel
+ * evaluation.
+ *
+ * <p>Optionally, a function may be provided that maps each element to a representative
+ * value. In this case, two elements will be considered duplicates if they have equal
+ * representative values, with equality being determined as above.
+ *
+ * <p>By default, the {@code Coder} of the output {@code PCollection}
+ * is the same as the {@code Coder} of the input {@code PCollection}.
+ *
+ * <p>Each output element is in the same window as its corresponding input
+ * element, and has the timestamp of the end of that window. The output
+ * {@code PCollection} has the same
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * as the input.
+ *
+ * <p>Does not preserve any order the input PCollection might have had.
+ *
+ * <p>Example of use:
+ * <pre> {@code
+ * PCollection<String> words = ...;
+ * PCollection<String> uniqueWords =
+ * words.apply(Distinct.<String>create());
+ * } </pre>
+ *
+ * @param <T> the type of the elements of the input and output
+ * {@code PCollection}s
+ */
+public class Distinct<T> extends PTransform<PCollection<T>,
+ PCollection<T>> {
+ /**
+ * Returns a {@code Distinct<T>} {@code PTransform}.
+ *
+ * @param <T> the type of the elements of the input and output
+ * {@code PCollection}s
+ */
+ public static <T> Distinct<T> create() {
+ return new Distinct<T>();
+ }
+
+ /**
+ * Returns a {@code Distinct<T, IdT>} {@code PTransform}.
+ *
+ * @param <T> the type of the elements of the input and output
+ * {@code PCollection}s
+ * @param <IdT> the type of the representative value used to dedup
+ */
+ public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
+ SerializableFunction<T, IdT> fn) {
+ return new WithRepresentativeValues<T, IdT>(fn, null);
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> in) {
+ return in
+ .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
+ @Override
+ public KV<T, Void> apply(T element) {
+ return KV.of(element, (Void) null);
+ }
+ }))
+ .apply(Combine.<T, Void>perKey(
+ new SerializableFunction<Iterable<Void>, Void>() {
+ @Override
+ public Void apply(Iterable<Void> iter) {
+ return null; // ignore input
+ }
+ }))
+ .apply(Keys.<T>create());
+ }
+
+ /**
+ * A {@link Distinct} {@link PTransform} that uses a {@link SerializableFunction} to
+ * obtain a representative value for each input element.
+ *
+ * <p>Construct via {@link Distinct#withRepresentativeValueFn(SerializableFunction)}.
+ *
+ * @param <T> the type of input and output element
+ * @param <IdT> the type of representative values used to dedup
+ */
+ public static class WithRepresentativeValues<T, IdT>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ private final SerializableFunction<T, IdT> fn;
+ private final TypeDescriptor<IdT> representativeType;
+
+ private WithRepresentativeValues(
+ SerializableFunction<T, IdT> fn, TypeDescriptor<IdT> representativeType) {
+ this.fn = fn;
+ this.representativeType = representativeType;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> in) {
+ WithKeys<IdT, T> withKeys = WithKeys.of(fn);
+ if (representativeType != null) {
+ withKeys = withKeys.withKeyType(representativeType);
+ }
+ return in
+ .apply(withKeys)
+ .apply(Combine.<IdT, T, T>perKey(
+ new Combine.BinaryCombineFn<T>() {
+ @Override
+ public T apply(T left, T right) {
+ return left;
+ }
+ }))
+ .apply(Values.<T>create());
+ }
+
+ /**
+ * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified output type descriptor.
+ *
+ * <p>Required for use of
+ * {@link Distinct#withRepresentativeValueFn(SerializableFunction)}
+ * in Java 8 with a lambda as the fn.
+ *
+ * @param type a {@link TypeDescriptor} describing the representative type of this
+ * {@code WithRepresentativeValues}
+ * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified output type descriptor.
+ */
+ public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> type) {
+ return new WithRepresentativeValues<>(fn, type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
deleted file mode 100644
index 709aa4a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * {@code RemoveDuplicates<T>} takes a {@code PCollection<T>} and
- * returns a {@code PCollection<T>} that has all the elements of the
- * input but with duplicate elements removed such that each element is
- * unique within each window.
- *
- * <p>Two values of type {@code T} are compared for equality <b>not</b> by
- * regular Java {@link Object#equals}, but instead by first encoding
- * each of the elements using the {@code PCollection}'s {@code Coder}, and then
- * comparing the encoded bytes. This admits efficient parallel
- * evaluation.
- *
- * <p>Optionally, a function may be provided that maps each element to a representative
- * value. In this case, two elements will be considered duplicates if they have equal
- * representative values, with equality being determined as above.
- *
- * <p>By default, the {@code Coder} of the output {@code PCollection}
- * is the same as the {@code Coder} of the input {@code PCollection}.
- *
- * <p>Each output element is in the same window as its corresponding input
- * element, and has the timestamp of the end of that window. The output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * as the input.
- *
- * <p>Does not preserve any order the input PCollection might have had.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String> words = ...;
- * PCollection<String> uniqueWords =
- * words.apply(RemoveDuplicates.<String>create());
- * } </pre>
- *
- * @param <T> the type of the elements of the input and output
- * {@code PCollection}s
- */
-public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
- PCollection<T>> {
- /**
- * Returns a {@code RemoveDuplicates<T>} {@code PTransform}.
- *
- * @param <T> the type of the elements of the input and output
- * {@code PCollection}s
- */
- public static <T> RemoveDuplicates<T> create() {
- return new RemoveDuplicates<T>();
- }
-
- /**
- * Returns a {@code RemoveDuplicates<T, IdT>} {@code PTransform}.
- *
- * @param <T> the type of the elements of the input and output
- * {@code PCollection}s
- * @param <IdT> the type of the representative value used to dedup
- */
- public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
- SerializableFunction<T, IdT> fn) {
- return new WithRepresentativeValues<T, IdT>(fn, null);
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> in) {
- return in
- .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
- @Override
- public KV<T, Void> apply(T element) {
- return KV.of(element, (Void) null);
- }
- }))
- .apply(Combine.<T, Void>perKey(
- new SerializableFunction<Iterable<Void>, Void>() {
- @Override
- public Void apply(Iterable<Void> iter) {
- return null; // ignore input
- }
- }))
- .apply(Keys.<T>create());
- }
-
- /**
- * A {@link RemoveDuplicates} {@link PTransform} that uses a {@link SerializableFunction} to
- * obtain a representative value for each input element.
- *
- * <p>Construct via {@link RemoveDuplicates#withRepresentativeValueFn(SerializableFunction)}.
- *
- * @param <T> the type of input and output element
- * @param <IdT> the type of representative values used to dedup
- */
- public static class WithRepresentativeValues<T, IdT>
- extends PTransform<PCollection<T>, PCollection<T>> {
- private final SerializableFunction<T, IdT> fn;
- private final TypeDescriptor<IdT> representativeType;
-
- private WithRepresentativeValues(
- SerializableFunction<T, IdT> fn, TypeDescriptor<IdT> representativeType) {
- this.fn = fn;
- this.representativeType = representativeType;
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> in) {
- WithKeys<IdT, T> withKeys = WithKeys.of(fn);
- if (representativeType != null) {
- withKeys = withKeys.withKeyType(representativeType);
- }
- return in
- .apply(withKeys)
- .apply(Combine.<IdT, T, T>perKey(
- new Combine.BinaryCombineFn<T>() {
- @Override
- public T apply(T left, T right) {
- return left;
- }
- }))
- .apply(Values.<T>create());
- }
-
- /**
- * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
- * the specified output type descriptor.
- *
- * <p>Required for use of
- * {@link RemoveDuplicates#withRepresentativeValueFn(SerializableFunction)}
- * in Java 8 with a lambda as the fn.
- *
- * @param type a {@link TypeDescriptor} describing the representative type of this
- * {@code WithRepresentativeValues}
- * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
- * the specified output type descriptor.
- */
- public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> type) {
- return new WithRepresentativeValues<>(fn, type);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 0b92b26..2397d10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -28,12 +28,12 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
@@ -56,7 +56,7 @@ public class CountingInputTest {
// Unique count == numElements
PAssert.thatSingleton(
input
- .apply(RemoveDuplicates.<Long>create())
+ .apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
@@ -141,7 +141,7 @@ public class CountingInputTest {
PCollection<Long> diffs =
input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
- .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+ .apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 7c5fa13..88c68d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -36,12 +36,12 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
@@ -66,7 +66,7 @@ public class CountingSourceTest {
.isEqualTo(numElements);
// Unique count == numElements
PAssert
- .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+ .thatSingleton(input.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
@@ -177,7 +177,7 @@ public class CountingSourceTest {
PCollection<Long> diffs = input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
- .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+ .apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);
@@ -204,7 +204,7 @@ public class CountingSourceTest {
PCollection<Long> diffs =
input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
- .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+ .apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index ba1ddfe..b63c73d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -214,7 +214,7 @@ public class ApproximateUniqueTest implements Serializable {
PCollection<Long> approximate = input.apply(ApproximateUnique.<String>globally(sampleSize));
final PCollectionView<Long> exact =
input
- .apply(RemoveDuplicates.<String>create())
+ .apply(Distinct.<String>create())
.apply(Count.<String>globally())
.apply(View.<Long>asSingleton());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
new file mode 100644
index 0000000..257b364
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Distinct}.
+ */
+@RunWith(JUnit4.class)
+public class DistinctTest {
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDistinct() {
+ List<String> strings = Arrays.asList(
+ "k1",
+ "k5",
+ "k5",
+ "k2",
+ "k1",
+ "k2",
+ "k3");
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input =
+ p.apply(Create.of(strings)
+ .withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output =
+ input.apply(Distinct.<String>create());
+
+ PAssert.that(output)
+ .containsInAnyOrder("k1", "k5", "k2", "k3");
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDistinctEmpty() {
+ List<String> strings = Arrays.asList();
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input =
+ p.apply(Create.of(strings)
+ .withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output =
+ input.apply(Distinct.<String>create());
+
+ PAssert.that(output).empty();
+ p.run();
+ }
+
+ private static class Keys implements SerializableFunction<KV<String, String>, String> {
+ @Override
+ public String apply(KV<String, String> input) {
+ return input.getKey();
+ }
+ }
+
+ private static class Checker implements SerializableFunction<Iterable<KV<String, String>>, Void> {
+ @Override
+ public Void apply(Iterable<KV<String, String>> input) {
+ Map<String, String> values = new HashMap<>();
+ for (KV<String, String> kv : input) {
+ values.put(kv.getKey(), kv.getValue());
+ }
+ assertEquals(2, values.size());
+ assertTrue(values.get("k1").equals("v1") || values.get("k1").equals("v2"));
+ assertEquals("v1", values.get("k2"));
+ return null;
+ }
+ }
+
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDistinctWithRepresentativeValue() {
+ List<KV<String, String>> strings = Arrays.asList(
+ KV.of("k1", "v1"),
+ KV.of("k1", "v2"),
+ KV.of("k2", "v1"));
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<KV<String, String>> input = p.apply(Create.of(strings));
+
+ PCollection<KV<String, String>> output =
+ input.apply(Distinct.withRepresentativeValueFn(new Keys()));
+
+
+ PAssert.that(output).satisfies(new Checker());
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java
deleted file mode 100644
index 312cba6..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for RemovedDuplicates.
- */
-@RunWith(JUnit4.class)
-public class RemoveDuplicatesTest {
- @Test
- @Category(RunnableOnService.class)
- public void testRemoveDuplicates() {
- List<String> strings = Arrays.asList(
- "k1",
- "k5",
- "k5",
- "k2",
- "k1",
- "k2",
- "k3");
-
- Pipeline p = TestPipeline.create();
-
- PCollection<String> input =
- p.apply(Create.of(strings)
- .withCoder(StringUtf8Coder.of()));
-
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
-
- PAssert.that(output)
- .containsInAnyOrder("k1", "k5", "k2", "k3");
- p.run();
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testRemoveDuplicatesEmpty() {
- List<String> strings = Arrays.asList();
-
- Pipeline p = TestPipeline.create();
-
- PCollection<String> input =
- p.apply(Create.of(strings)
- .withCoder(StringUtf8Coder.of()));
-
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
-
- PAssert.that(output).empty();
- p.run();
- }
-
- private static class Keys implements SerializableFunction<KV<String, String>, String> {
- @Override
- public String apply(KV<String, String> input) {
- return input.getKey();
- }
- }
-
- private static class Checker implements SerializableFunction<Iterable<KV<String, String>>, Void> {
- @Override
- public Void apply(Iterable<KV<String, String>> input) {
- Map<String, String> values = new HashMap<>();
- for (KV<String, String> kv : input) {
- values.put(kv.getKey(), kv.getValue());
- }
- assertEquals(2, values.size());
- assertTrue(values.get("k1").equals("v1") || values.get("k1").equals("v2"));
- assertEquals("v1", values.get("k2"));
- return null;
- }
- }
-
-
- @Test
- @Category(RunnableOnService.class)
- public void testRemoveDuplicatesWithRepresentativeValue() {
- List<KV<String, String>> strings = Arrays.asList(
- KV.of("k1", "v1"),
- KV.of("k1", "v2"),
- KV.of("k2", "v1"));
-
- Pipeline p = TestPipeline.create();
-
- PCollection<KV<String, String>> input = p.apply(Create.of(strings));
-
- PCollection<KV<String, String>> output =
- input.apply(RemoveDuplicates.withRepresentativeValueFn(new Keys()));
-
-
- PAssert.that(output).satisfies(new Checker());
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2f3c524..cc1ef26 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.util.CoderUtils;
@@ -252,7 +252,7 @@ public class KafkaIOTest {
.isEqualTo(numElements);
// Unique count == numElements
PAssert
- .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+ .thatSingleton(input.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
@@ -333,7 +333,7 @@ public class KafkaIOTest {
PCollection<Long> diffs = input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
- .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+ .apply("DistinctTimestamps", Distinct.<Long>create());
// This assert also confirms that diffs only has one unique value.
PAssert.thatSingleton(diffs).isEqualTo(0L);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
new file mode 100644
index 0000000..99ef232
--- /dev/null
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Java 8 tests for {@link Distinct}.
+ */
+@RunWith(JUnit4.class)
+public class DistinctJava8Test {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() {
+ TestPipeline p = TestPipeline.create();
+
+ Multimap<Integer, String> predupedContents = HashMultimap.create();
+ predupedContents.put(3, "foo");
+ predupedContents.put(4, "foos");
+ predupedContents.put(6, "barbaz");
+ predupedContents.put(6, "bazbar");
+ PCollection<String> dupes =
+ p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+ PCollection<String> deduped =
+ dupes.apply(Distinct.withRepresentativeValueFn((String s) -> s.length())
+ .withRepresentativeType(TypeDescriptor.of(Integer.class)));
+
+ PAssert.that(deduped).satisfies((Iterable<String> strs) -> {
+ Set<Integer> seenLengths = new HashSet<>();
+ for (String s : strs) {
+ assertThat(predupedContents.values(), hasItem(s));
+ assertThat(seenLengths, not(contains(s.length())));
+ seenLengths.add(s.length());
+ }
+ return null;
+ });
+
+ p.run();
+ }
+
+ @Test
+ public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
+ TestPipeline p = TestPipeline.create();
+
+ Multimap<Integer, String> predupedContents = HashMultimap.create();
+ predupedContents.put(3, "foo");
+ predupedContents.put(4, "foos");
+ predupedContents.put(6, "barbaz");
+ predupedContents.put(6, "bazbar");
+ PCollection<String> dupes =
+ p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
+ thrown.expectMessage("No Coder has been manually specified");
+ thrown.expectMessage(
+ containsString("Building a Coder using a registered CoderFactory failed"));
+ thrown.expectMessage(
+ containsString("Building a Coder from the @DefaultCoder annotation failed"));
+ thrown.expectMessage(
+ containsString("Building a Coder from the fallback CoderProvider failed"));
+
+ // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
+ // implemented with
+ dupes.apply("RemoveRepresentativeDupes",
+ Distinct.withRepresentativeValueFn((String s) -> s.length()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8183ac82/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java
deleted file mode 100644
index 99266d4..0000000
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/RemoveDuplicatesJava8Test.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Java 8 tests for {@link RemoveDuplicates}.
- */
-@RunWith(JUnit4.class)
-public class RemoveDuplicatesJava8Test {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() {
- TestPipeline p = TestPipeline.create();
-
- Multimap<Integer, String> predupedContents = HashMultimap.create();
- predupedContents.put(3, "foo");
- predupedContents.put(4, "foos");
- predupedContents.put(6, "barbaz");
- predupedContents.put(6, "bazbar");
- PCollection<String> dupes =
- p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
- PCollection<String> deduped =
- dupes.apply(RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length())
- .withRepresentativeType(TypeDescriptor.of(Integer.class)));
-
- PAssert.that(deduped).satisfies((Iterable<String> strs) -> {
- Set<Integer> seenLengths = new HashSet<>();
- for (String s : strs) {
- assertThat(predupedContents.values(), hasItem(s));
- assertThat(seenLengths, not(contains(s.length())));
- seenLengths.add(s.length());
- }
- return null;
- });
-
- p.run();
- }
-
- @Test
- public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
- TestPipeline p = TestPipeline.create();
-
- Multimap<Integer, String> predupedContents = HashMultimap.create();
- predupedContents.put(3, "foo");
- predupedContents.put(4, "foos");
- predupedContents.put(6, "barbaz");
- predupedContents.put(6, "bazbar");
- PCollection<String> dupes =
- p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
- thrown.expectMessage("No Coder has been manually specified");
- thrown.expectMessage(
- containsString("Building a Coder using a registered CoderFactory failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the @DefaultCoder annotation failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the fallback CoderProvider failed"));
-
- // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
- // implemented with
- dupes.apply("RemoveRepresentativeDupes",
- RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()));
- }
-}
[2/2] incubator-beam git commit: This closes #1164
Posted by je...@apache.org.
This closes #1164
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8883877a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8883877a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8883877a
Branch: refs/heads/master
Commit: 8883877ae91fa42963181d419d8f3cbfea8b314b
Parents: facf096 8183ac8
Author: Jesse Anderson <je...@smokinghand.com>
Authored: Tue Nov 1 19:25:27 2016 -0600
Committer: Jesse Anderson <je...@smokinghand.com>
Committed: Tue Nov 1 19:25:27 2016 -0600
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 6 +-
.../beam/examples/cookbook/DeDupExample.java | 8 +-
.../org/apache/beam/examples/cookbook/README.md | 2 +-
.../beam/examples/complete/TfIdfTest.java | 4 +-
.../examples/cookbook/DeDupExampleTest.java | 82 ----------
.../examples/cookbook/DistinctExampleTest.java | 82 ++++++++++
.../UnboundedReadFromBoundedSourceTest.java | 4 +-
.../beam/runners/flink/examples/TFIDF.java | 6 +-
.../streaming/KafkaStreamingTest.java | 4 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 4 +-
.../apache/beam/sdk/transforms/Distinct.java | 158 ++++++++++++++++++
.../beam/sdk/transforms/RemoveDuplicates.java | 159 -------------------
.../apache/beam/sdk/io/CountingInputTest.java | 6 +-
.../apache/beam/sdk/io/CountingSourceTest.java | 8 +-
.../sdk/transforms/ApproximateUniqueTest.java | 2 +-
.../beam/sdk/transforms/DistinctTest.java | 130 +++++++++++++++
.../sdk/transforms/RemoveDuplicatesTest.java | 130 ---------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 6 +-
.../beam/sdk/transforms/DistinctJava8Test.java | 104 ++++++++++++
.../transforms/RemoveDuplicatesJava8Test.java | 104 ------------
20 files changed, 504 insertions(+), 505 deletions(-)
----------------------------------------------------------------------