You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/06 23:47:32 UTC
[beam] branch master updated: [BEAM-2939,
BEAM-9458] Add deduplication transform for SplittableDoFns
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8af39e6 [BEAM-2939, BEAM-9458] Add deduplication transform for SplittableDoFns
new a8e60bc Merge pull request #11065 from lukecwik/splittabledofn3
8af39e6 is described below
commit 8af39e6ef55cc18d459d8c9e7f543d05dc2d1e0f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Mar 3 17:19:36 2020 -0800
[BEAM-2939, BEAM-9458] Add deduplication transform for SplittableDoFns
This enables migrating the remaining UnboundedSources to use the UnboundedSoure SDF wrapper.
---
.../apache/beam/sdk/transforms/Deduplicate.java | 325 +++++++++++++++++++++
.../beam/sdk/transforms/DeduplicateTest.java | 251 ++++++++++++++++
2 files changed, 576 insertions(+)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
new file mode 100644
index 0000000..9dbf7b0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+
+/**
+ * A set of {@link PTransform}s which deduplicate input records over a time domain and threshold.
+ * Values in different windows will not be considered duplicates of each other. Deduplication is
+ * best effort.
+ *
+ * <p>Two values of type {@code T} are compared for equality <b>not</b> by regular Java {@link
+ * Object#equals}, but instead by first encoding each of the elements using the {@code
+ * PCollection}'s {@code Coder}, and then comparing the encoded bytes. This admits efficient
+ * parallel evaluation.
+ *
+ * <p>These PTransforms are different then {@link Distinct} since {@link Distinct} guarantees
+ * uniqueness of values within a {@link PCollection} but may support a narrower set of {@link
+ * org.apache.beam.sdk.values.WindowingStrategy windowing strategies} or may delay when output is
+ * produced.
+ *
+ * <p>The durations specified may impose memory and/or storage requirements within a runner and care
+ * might need to be used to ensure that the deduplication time limit is long enough to remove
+ * duplicates but short enough to not cause performance problems within a runner. Each runner may
+ * provide an optimized implementation of their choice using the deduplication time domain and
+ * threshold specified.
+ *
+ * <p>Does not preserve any order the input PCollection might have had.
+ *
+ * <p>Example of use:
+ *
+ * <pre>{@code
+ * PCollection<String> words = ...;
+ * PCollection<String> deduplicatedWords =
+ * words.apply(Deduplicate.<String>values());
+ * }</pre>
+ */
+public final class Deduplicate {
+ /** The default is the {@link TimeDomain#PROCESSING_TIME processing time domain}. */
+ public static final TimeDomain DEFAULT_TIME_DOMAIN = TimeDomain.PROCESSING_TIME;
+ /** The default duration is 10 mins. */
+ public static final Duration DEFAULT_DURATION = Duration.standardMinutes(10);
+
+ /**
+ * Deduplicates values over a specified time domain and threshold. Construct via {@link
+ * Deduplicate#values()}.
+ */
+ public static final class Values<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ private final TimeDomain timeDomain;
+ private final Duration duration;
+
+ private Values(TimeDomain timeDomain, Duration duration) {
+ this.timeDomain = timeDomain;
+ this.duration = duration;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return input
+ .apply(
+ "KeyByElement",
+ MapElements.via(
+ new SimpleFunction<T, KV<T, Void>>() {
+ @Override
+ public KV<T, Void> apply(T element) {
+ return KV.of(element, (Void) null);
+ }
+ }))
+ .apply(new KeyedValues<>(timeDomain, duration))
+ .apply(Keys.create());
+ }
+
+ /**
+ * Returns a {@code Values} {@link PTransform} like this one but with the specified time domain.
+ */
+ public Values<T> withTimeDomain(TimeDomain timeDomain) {
+ return new Values<T>(timeDomain, duration);
+ }
+
+ /**
+ * Returns a {@code Values} {@link PTransform} like this one but with the specified duration.
+ */
+ public Values<T> withDuration(Duration duration) {
+ return new Values<T>(timeDomain, duration);
+ }
+ }
+
+ /**
+ * A {@link PTransform} that uses a {@link SerializableFunction} to obtain a representative value
+ * for each input element used for deduplication.
+ *
+ * <p>Construct via {@link Deduplicate#withRepresentativeValueFn}.
+ *
+ * @param <T> the type of input and output element
+ * @param <IdT> the type of representative values used to dedup
+ */
+ public static final class WithRepresentativeValues<T, IdT>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ private final SerializableFunction<T, IdT> fn;
+ @Nullable private final TypeDescriptor<IdT> type;
+ @Nullable private final Coder<IdT> coder;
+ private final TimeDomain timeDomain;
+ private final Duration duration;
+
+ private WithRepresentativeValues(
+ TimeDomain timeDomain,
+ Duration duration,
+ SerializableFunction<T, IdT> fn,
+ @Nullable TypeDescriptor<IdT> type,
+ @Nullable Coder<IdT> coder) {
+ this.timeDomain = timeDomain;
+ this.duration = duration;
+ this.fn = fn;
+ this.type = type;
+ this.coder = coder;
+ }
+
+ /**
+ * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified id type descriptor.
+ *
+ * <p>Either {@link #withRepresentativeCoder} or this method must be invoked if using {@link
+ * Deduplicate#withRepresentativeValueFn} in Java 8 with a lambda as the fn.
+ *
+ * @param type a {@link TypeDescriptor} describing the representative type of this {@code
+ * WithRepresentativeValues}
+ * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified representative value type descriptor. Any previously set representative
+ * value coder will be cleared.
+ */
+ public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> type) {
+ return new WithRepresentativeValues<>(timeDomain, duration, fn, type, null);
+ }
+
+ /**
+ * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified id type coder.
+ *
+ * <p>Required for use of {@link Deduplicate#withRepresentativeValueFn} in Java 8 with a lambda
+ * as the fn.
+ *
+ * @param coder a {@link Coder} capable of encoding the representative type of this {@code
+ * WithRepresentativeValues}
+ * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified representative value coder. Any previously set representative value type
+ * descriptor will be cleared.
+ */
+ public WithRepresentativeValues<T, IdT> withRepresentativeCoder(Coder<IdT> coder) {
+ return new WithRepresentativeValues<>(timeDomain, duration, fn, null, coder);
+ }
+
+ /**
+ * Returns a {@code WithRepresentativeValues} {@link PTransform} like this one but with the
+ * specified time domain.
+ */
+ public WithRepresentativeValues<T, IdT> withTimeDomain(TimeDomain timeDomain) {
+ return new WithRepresentativeValues<>(timeDomain, duration, fn, type, coder);
+ }
+
+ /**
+ * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+ * the specified deduplication duration.
+ */
+ public WithRepresentativeValues<T, IdT> withDuration(Duration duration) {
+ return new WithRepresentativeValues<>(timeDomain, duration, fn, type, coder);
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ WithKeys<IdT, T> withKeys = WithKeys.of(fn);
+ if (type != null) {
+ withKeys = withKeys.withKeyType(type);
+ }
+ PCollection<KV<IdT, T>> inputWithKey = input.apply(withKeys);
+ if (coder != null) {
+ inputWithKey.setCoder(KvCoder.of(coder, input.getCoder()));
+ }
+ return inputWithKey
+ .apply(new KeyedValues<>(timeDomain, duration))
+ .apply(org.apache.beam.sdk.transforms.Values.create());
+ }
+ }
+
+ /**
+ * Deduplicates keyed values using the key over a specified time domain and threshold. Construct
+ * via {@link Deduplicate#keyedValues()} ()}.
+ */
+ public static final class KeyedValues<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+ private final TimeDomain timeDomain;
+ private final Duration duration;
+
+ private KeyedValues(TimeDomain timeDomain, Duration duration) {
+ this.timeDomain = timeDomain;
+ this.duration = duration;
+ }
+
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+ return input.apply(ParDo.of(new DeduplicateFn<>(timeDomain, duration)));
+ }
+
+ /**
+ * Returns a {@code KeyedValues} {@link PTransform} like this one but with the specified time
+ * domain.
+ */
+ public KeyedValues<K, V> withTimeDomain(TimeDomain timeDomain) {
+ return new KeyedValues<>(timeDomain, duration);
+ }
+
+ /**
+ * Returns a {@code KeyedValues} {@link PTransform} like this one but with the specified
+ * duration.
+ */
+ public KeyedValues<K, V> withDuration(Duration duration) {
+ return new KeyedValues<>(timeDomain, duration);
+ }
+ }
+
+ /**
+ * Returns a deduplication transform that deduplicates values for up to 10 mins within the {@link
+ * TimeDomain#PROCESSING_TIME processing time domain}.
+ */
+ public static <T> Deduplicate.Values<T> values() {
+ return new Deduplicate.Values<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION);
+ }
+
+ /**
+ * Returns a deduplication transform that deduplicates keyed values using the key for up to 10
+ * mins within the {@link TimeDomain#PROCESSING_TIME processing time domain}.
+ */
+ public static <K, V> Deduplicate.KeyedValues<K, V> keyedValues() {
+ return new Deduplicate.KeyedValues<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION);
+ }
+
+ /**
+ * Returns a deduplication transform that deduplicates values using the supplied representative
+ * value for up to 10 mins within the {@link TimeDomain#PROCESSING_TIME processing time domain}.
+ */
+ public static <T, IdT> Deduplicate.WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
+ SerializableFunction<T, IdT> representativeValueFn) {
+ return new Deduplicate.WithRepresentativeValues<T, IdT>(
+ DEFAULT_TIME_DOMAIN, DEFAULT_DURATION, representativeValueFn, null, null);
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ // prevent instantiation
+ private Deduplicate() {}
+
+ /**
+ * A stateful {@link DoFn} that uses a {@link ValueState} to capture whether the value has ever
+ * been seen.
+ *
+ * @param <K>
+ * @param <V>
+ */
+ private static class DeduplicateFn<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
+ private static final String EXPIRY_TIMER = "expiryTimer";
+ private static final String SEEN_STATE = "seen";
+
+ @TimerId(EXPIRY_TIMER)
+ private final TimerSpec expiryTimerSpec;
+
+ @StateId(SEEN_STATE)
+ private final StateSpec<ValueState<Boolean>> seenState = StateSpecs.value(BooleanCoder.of());
+
+ private final Duration duration;
+
+ private DeduplicateFn(TimeDomain timeDomain, Duration duration) {
+ this.expiryTimerSpec = TimerSpecs.timer(timeDomain);
+ this.duration = duration;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<K, V> element,
+ OutputReceiver<KV<K, V>> receiver,
+ @StateId(SEEN_STATE) ValueState<Boolean> seenState,
+ @TimerId(EXPIRY_TIMER) Timer expiryTimer) {
+ Boolean seen = seenState.read();
+ // Seen state is either set or not set so if it has been set then it must be true.
+ if (seen == null) {
+ expiryTimer.offset(duration).setRelative();
+ seenState.write(true);
+ receiver.output(element);
+ }
+ }
+
+ @OnTimer(EXPIRY_TIMER)
+ public void onExpiry(
+ OnTimerContext context, @StateId(SEEN_STATE) ValueState<Boolean> seenState) {
+ seenState.clear();
+ }
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DeduplicateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DeduplicateTest.java
new file mode 100644
index 0000000..00ee2d2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DeduplicateTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DeduplicateTest {
+
+ @Rule public TestPipeline p = TestPipeline.create();
+
+ @Test
+ @Category({NeedsRunner.class, UsesTestStream.class})
+ public void testInDifferentWindows() {
+ Instant base = new Instant(0);
+ TestStream<String> values =
+ TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of("k1", base),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+ TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))),
+ TimestampedValue.of("k4", base.plus(Duration.standardSeconds(60))),
+ TimestampedValue.of("k5", base.plus(Duration.standardSeconds(70))),
+ TimestampedValue.of("k6", base.plus(Duration.standardSeconds(80))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<String> distinctValues =
+ p.apply(values)
+ .apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
+ .apply(Deduplicate.values());
+ PAssert.that(distinctValues)
+ .inWindow(new IntervalWindow(base, base.plus(Duration.standardSeconds(30))))
+ .containsInAnyOrder("k1", "k2", "k3");
+ PAssert.that(distinctValues)
+ .inWindow(
+ new IntervalWindow(
+ base.plus(Duration.standardSeconds(30)), base.plus(Duration.standardSeconds(60))))
+ .containsInAnyOrder("k1", "k2", "k3");
+ PAssert.that(distinctValues)
+ .inWindow(
+ new IntervalWindow(
+ base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(90))))
+ .containsInAnyOrder("k4", "k5", "k6");
+ p.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTestStream.class})
+ public void testEventTime() {
+ Instant base = new Instant(0);
+ TestStream<String> values =
+ TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of("k1", base),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+ TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+ .advanceWatermarkTo(base.plus(Duration.standardMinutes(1)))
+ .addElements(
+ TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
+ .advanceWatermarkTo(
+ base.plus(Duration.standardMinutes(1)).plus(Deduplicate.DEFAULT_DURATION))
+ .addElements(TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<String> distinctValues =
+ p.apply(values).apply(Deduplicate.<String>values().withTimeDomain(TimeDomain.EVENT_TIME));
+ PAssert.that(distinctValues)
+ .satisfies(
+ (Iterable<String> input) -> {
+ assertEquals(1, Iterables.frequency(input, "k1"));
+ assertEquals(1, Iterables.frequency(input, "k2"));
+ assertEquals(1, Iterables.frequency(input, "k3"));
+ assertTrue(
+ Iterables.frequency(input, "maybedup") == 1
+ || Iterables.frequency(input, "maybedup") == 2);
+ return null;
+ });
+ p.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTestStreamWithProcessingTime.class})
+ public void testProcessingTime() {
+ Instant base = new Instant(0);
+ TestStream<String> values =
+ TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of("k1", base),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+ TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
+ .advanceProcessingTime(Deduplicate.DEFAULT_DURATION)
+ .addElements(TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<String> distinctValues = p.apply(values).apply(Deduplicate.values());
+ PAssert.that(distinctValues)
+ .satisfies(
+ (Iterable<String> input) -> {
+ assertEquals(1, Iterables.frequency(input, "k1"));
+ assertEquals(1, Iterables.frequency(input, "k2"));
+ assertEquals(1, Iterables.frequency(input, "k3"));
+ assertTrue(
+ Iterables.frequency(input, "maybedup") == 1
+ || Iterables.frequency(input, "maybedup") == 2);
+ return null;
+ });
+ p.run();
+ }
+
+ private static class Keys<T> implements SerializableFunction<KV<T, String>, T> {
+ @Override
+ public T apply(KV<T, String> input) {
+ return input.getKey();
+ }
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTestStreamWithProcessingTime.class})
+ public void testRepresentativeValuesWithCoder() {
+ Instant base = new Instant(0);
+ TestStream<KV<Integer, String>> values =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of(KV.of(1, "k1"), base),
+ TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<Integer, String>> distinctValues =
+ p.apply(values)
+ .apply(
+ Deduplicate.withRepresentativeValueFn(new Keys<Integer>())
+ .withRepresentativeCoder(VarIntCoder.of()));
+
+ PAssert.that(distinctValues).containsInAnyOrder(KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
+ p.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTestStreamWithProcessingTime.class})
+ public void testTriggeredRepresentativeValuesWithType() {
+ Instant base = new Instant(0);
+ TestStream<KV<Integer, String>> values =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(base)
+ .addElements(
+ TimestampedValue.of(KV.of(1, "k1"), base),
+ TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
+ TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
+ TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
+ TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<Integer, String>> distinctValues =
+ p.apply(values)
+ .apply(
+ Deduplicate.withRepresentativeValueFn(new Keys<Integer>())
+ .withRepresentativeCoder(VarIntCoder.of()));
+
+ PAssert.that(distinctValues).containsInAnyOrder(KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
+ p.run();
+ }
+
+ @Test
+ public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
+
+ Multimap<Integer, String> predupedContents = HashMultimap.create();
+ predupedContents.put(3, "foo");
+ predupedContents.put(4, "foos");
+ predupedContents.put(6, "barbaz");
+ predupedContents.put(6, "bazbar");
+ PCollection<String> dupes =
+ p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+
+ assertThrows(
+ "Unable to return a default Coder for RemoveRepresentativeDupes",
+ IllegalStateException.class,
+ () ->
+ dupes.apply(
+ "RemoveRepresentativeDupes",
+ Deduplicate.withRepresentativeValueFn(String::length)));
+ }
+}