You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/06 23:47:32 UTC

[beam] branch master updated: [BEAM-2939, BEAM-9458] Add deduplication transform for SplittableDoFns

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8af39e6  [BEAM-2939, BEAM-9458] Add deduplication transform for SplittableDoFns
     new a8e60bc  Merge pull request #11065 from lukecwik/splittabledofn3
8af39e6 is described below

commit 8af39e6ef55cc18d459d8c9e7f543d05dc2d1e0f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Mar 3 17:19:36 2020 -0800

    [BEAM-2939, BEAM-9458] Add deduplication transform for SplittableDoFns
    
    This enables migrating the remaining UnboundedSources to use the UnboundedSoure SDF wrapper.
---
 .../apache/beam/sdk/transforms/Deduplicate.java    | 325 +++++++++++++++++++++
 .../beam/sdk/transforms/DeduplicateTest.java       | 251 ++++++++++++++++
 2 files changed, 576 insertions(+)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
new file mode 100644
index 0000000..9dbf7b0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+
+/**
+ * A set of {@link PTransform}s which deduplicate input records over a time domain and threshold.
+ * Values in different windows will not be considered duplicates of each other. Deduplication is
+ * best effort.
+ *
+ * <p>Two values of type {@code T} are compared for equality <b>not</b> by regular Java {@link
+ * Object#equals}, but instead by first encoding each of the elements using the {@code
+ * PCollection}'s {@code Coder}, and then comparing the encoded bytes. This admits efficient
+ * parallel evaluation.
+ *
+ * <p>These PTransforms are different then {@link Distinct} since {@link Distinct} guarantees
+ * uniqueness of values within a {@link PCollection} but may support a narrower set of {@link
+ * org.apache.beam.sdk.values.WindowingStrategy windowing strategies} or may delay when output is
+ * produced.
+ *
+ * <p>The durations specified may impose memory and/or storage requirements within a runner and care
+ * might need to be used to ensure that the deduplication time limit is long enough to remove
+ * duplicates but short enough to not cause performance problems within a runner. Each runner may
+ * provide an optimized implementation of their choice using the deduplication time domain and
+ * threshold specified.
+ *
+ * <p>Does not preserve any order the input PCollection might have had.
+ *
+ * <p>Example of use:
+ *
+ * <pre>{@code
+ * PCollection<String> words = ...;
+ * PCollection<String> deduplicatedWords =
+ *     words.apply(Deduplicate.<String>values());
+ * }</pre>
+ */
+public final class Deduplicate {
+  /** The default is the {@link TimeDomain#PROCESSING_TIME processing time domain}. */
+  public static final TimeDomain DEFAULT_TIME_DOMAIN = TimeDomain.PROCESSING_TIME;
+  /** The default duration is 10 mins. */
+  public static final Duration DEFAULT_DURATION = Duration.standardMinutes(10);
+
+  /**
+   * Deduplicates values over a specified time domain and threshold. Construct via {@link
+   * Deduplicate#values()}.
+   */
+  public static final class Values<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private final TimeDomain timeDomain;
+    private final Duration duration;
+
+    private Values(TimeDomain timeDomain, Duration duration) {
+      this.timeDomain = timeDomain;
+      this.duration = duration;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply(
+              "KeyByElement",
+              MapElements.via(
+                  new SimpleFunction<T, KV<T, Void>>() {
+                    @Override
+                    public KV<T, Void> apply(T element) {
+                      return KV.of(element, (Void) null);
+                    }
+                  }))
+          .apply(new KeyedValues<>(timeDomain, duration))
+          .apply(Keys.create());
+    }
+
+    /**
+     * Returns a {@code Values} {@link PTransform} like this one but with the specified time domain.
+     */
+    public Values<T> withTimeDomain(TimeDomain timeDomain) {
+      return new Values<T>(timeDomain, duration);
+    }
+
+    /**
+     * Returns a {@code Values} {@link PTransform} like this one but with the specified duration.
+     */
+    public Values<T> withDuration(Duration duration) {
+      return new Values<T>(timeDomain, duration);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that uses a {@link SerializableFunction} to obtain a representative value
+   * for each input element used for deduplication.
+   *
+   * <p>Construct via {@link Deduplicate#withRepresentativeValueFn}.
+   *
+   * @param <T> the type of input and output element
+   * @param <IdT> the type of representative values used to dedup
+   */
+  public static final class WithRepresentativeValues<T, IdT>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    private final SerializableFunction<T, IdT> fn;
+    @Nullable private final TypeDescriptor<IdT> type;
+    @Nullable private final Coder<IdT> coder;
+    private final TimeDomain timeDomain;
+    private final Duration duration;
+
+    private WithRepresentativeValues(
+        TimeDomain timeDomain,
+        Duration duration,
+        SerializableFunction<T, IdT> fn,
+        @Nullable TypeDescriptor<IdT> type,
+        @Nullable Coder<IdT> coder) {
+      this.timeDomain = timeDomain;
+      this.duration = duration;
+      this.fn = fn;
+      this.type = type;
+      this.coder = coder;
+    }
+
+    /**
+     * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     * the specified id type descriptor.
+     *
+     * <p>Either {@link #withRepresentativeCoder} or this method must be invoked if using {@link
+     * Deduplicate#withRepresentativeValueFn} in Java 8 with a lambda as the fn.
+     *
+     * @param type a {@link TypeDescriptor} describing the representative type of this {@code
+     *     WithRepresentativeValues}
+     * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     *     the specified representative value type descriptor. Any previously set representative
+     *     value coder will be cleared.
+     */
+    public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> type) {
+      return new WithRepresentativeValues<>(timeDomain, duration, fn, type, null);
+    }
+
+    /**
+     * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     * the specified id type coder.
+     *
+     * <p>Required for use of {@link Deduplicate#withRepresentativeValueFn} in Java 8 with a lambda
+     * as the fn.
+     *
+     * @param coder a {@link Coder} capable of encoding the representative type of this {@code
+     *     WithRepresentativeValues}
+     * @return A {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     *     the specified representative value coder. Any previously set representative value type
+     *     descriptor will be cleared.
+     */
+    public WithRepresentativeValues<T, IdT> withRepresentativeCoder(Coder<IdT> coder) {
+      return new WithRepresentativeValues<>(timeDomain, duration, fn, null, coder);
+    }
+
+    /**
+     * Returns a {@code WithRepresentativeValues} {@link PTransform} like this one but with the
+     * specified time domain.
+     */
+    public WithRepresentativeValues<T, IdT> withTimeDomain(TimeDomain timeDomain) {
+      return new WithRepresentativeValues<>(timeDomain, duration, fn, type, coder);
+    }
+
+    /**
+     * Return a {@code WithRepresentativeValues} {@link PTransform} that is like this one, but with
+     * the specified deduplication duration.
+     */
+    public WithRepresentativeValues<T, IdT> withDuration(Duration duration) {
+      return new WithRepresentativeValues<>(timeDomain, duration, fn, type, coder);
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      WithKeys<IdT, T> withKeys = WithKeys.of(fn);
+      if (type != null) {
+        withKeys = withKeys.withKeyType(type);
+      }
+      PCollection<KV<IdT, T>> inputWithKey = input.apply(withKeys);
+      if (coder != null) {
+        inputWithKey.setCoder(KvCoder.of(coder, input.getCoder()));
+      }
+      return inputWithKey
+          .apply(new KeyedValues<>(timeDomain, duration))
+          .apply(org.apache.beam.sdk.transforms.Values.create());
+    }
+  }
+
+  /**
+   * Deduplicates keyed values using the key over a specified time domain and threshold. Construct
+   * via {@link Deduplicate#keyedValues()} ()}.
+   */
+  public static final class KeyedValues<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+    private final TimeDomain timeDomain;
+    private final Duration duration;
+
+    private KeyedValues(TimeDomain timeDomain, Duration duration) {
+      this.timeDomain = timeDomain;
+      this.duration = duration;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(ParDo.of(new DeduplicateFn<>(timeDomain, duration)));
+    }
+
+    /**
+     * Returns a {@code KeyedValues} {@link PTransform} like this one but with the specified time
+     * domain.
+     */
+    public KeyedValues<K, V> withTimeDomain(TimeDomain timeDomain) {
+      return new KeyedValues<>(timeDomain, duration);
+    }
+
+    /**
+     * Returns a {@code KeyedValues} {@link PTransform} like this one but with the specified
+     * duration.
+     */
+    public KeyedValues<K, V> withDuration(Duration duration) {
+      return new KeyedValues<>(timeDomain, duration);
+    }
+  }
+
+  /**
+   * Returns a deduplication transform that deduplicates values for up to 10 mins within the {@link
+   * TimeDomain#PROCESSING_TIME processing time domain}.
+   */
+  public static <T> Deduplicate.Values<T> values() {
+    return new Deduplicate.Values<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION);
+  }
+
+  /**
+   * Returns a deduplication transform that deduplicates keyed values using the key for up to 10
+   * mins within the {@link TimeDomain#PROCESSING_TIME processing time domain}.
+   */
+  public static <K, V> Deduplicate.KeyedValues<K, V> keyedValues() {
+    return new Deduplicate.KeyedValues<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION);
+  }
+
+  /**
+   * Returns a deduplication transform that deduplicates values using the supplied representative
+   * value for up to 10 mins within the {@link TimeDomain#PROCESSING_TIME processing time domain}.
+   */
+  public static <T, IdT> Deduplicate.WithRepresentativeValues<T, IdT> withRepresentativeValueFn(
+      SerializableFunction<T, IdT> representativeValueFn) {
+    return new Deduplicate.WithRepresentativeValues<T, IdT>(
+        DEFAULT_TIME_DOMAIN, DEFAULT_DURATION, representativeValueFn, null, null);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  // prevent instantiation
+  private Deduplicate() {}
+
+  /**
+   * A stateful {@link DoFn} that uses a {@link ValueState} to capture whether the value has ever
+   * been seen.
+   *
+   * @param <K>
+   * @param <V>
+   */
+  private static class DeduplicateFn<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
+    private static final String EXPIRY_TIMER = "expiryTimer";
+    private static final String SEEN_STATE = "seen";
+
+    @TimerId(EXPIRY_TIMER)
+    private final TimerSpec expiryTimerSpec;
+
+    @StateId(SEEN_STATE)
+    private final StateSpec<ValueState<Boolean>> seenState = StateSpecs.value(BooleanCoder.of());
+
+    private final Duration duration;
+
+    private DeduplicateFn(TimeDomain timeDomain, Duration duration) {
+      this.expiryTimerSpec = TimerSpecs.timer(timeDomain);
+      this.duration = duration;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<K, V> element,
+        OutputReceiver<KV<K, V>> receiver,
+        @StateId(SEEN_STATE) ValueState<Boolean> seenState,
+        @TimerId(EXPIRY_TIMER) Timer expiryTimer) {
+      Boolean seen = seenState.read();
+      // Seen state is either set or not set so if it has been set then it must be true.
+      if (seen == null) {
+        expiryTimer.offset(duration).setRelative();
+        seenState.write(true);
+        receiver.output(element);
+      }
+    }
+
+    @OnTimer(EXPIRY_TIMER)
+    public void onExpiry(
+        OnTimerContext context, @StateId(SEEN_STATE) ValueState<Boolean> seenState) {
+      seenState.clear();
+    }
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DeduplicateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DeduplicateTest.java
new file mode 100644
index 0000000..00ee2d2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DeduplicateTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DeduplicateTest {
+
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category({NeedsRunner.class, UsesTestStream.class})
+  public void testInDifferentWindows() {
+    Instant base = new Instant(0);
+    TestStream<String> values =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(base)
+            .addElements(
+                TimestampedValue.of("k1", base),
+                TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+                TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+                TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+                TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))),
+                TimestampedValue.of("k4", base.plus(Duration.standardSeconds(60))),
+                TimestampedValue.of("k5", base.plus(Duration.standardSeconds(70))),
+                TimestampedValue.of("k6", base.plus(Duration.standardSeconds(80))))
+            .advanceWatermarkToInfinity();
+
+    PCollection<String> distinctValues =
+        p.apply(values)
+            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
+            .apply(Deduplicate.values());
+    PAssert.that(distinctValues)
+        .inWindow(new IntervalWindow(base, base.plus(Duration.standardSeconds(30))))
+        .containsInAnyOrder("k1", "k2", "k3");
+    PAssert.that(distinctValues)
+        .inWindow(
+            new IntervalWindow(
+                base.plus(Duration.standardSeconds(30)), base.plus(Duration.standardSeconds(60))))
+        .containsInAnyOrder("k1", "k2", "k3");
+    PAssert.that(distinctValues)
+        .inWindow(
+            new IntervalWindow(
+                base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(90))))
+        .containsInAnyOrder("k4", "k5", "k6");
+    p.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTestStream.class})
+  public void testEventTime() {
+    Instant base = new Instant(0);
+    TestStream<String> values =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(base)
+            .addElements(
+                TimestampedValue.of("k1", base),
+                TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+                TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+            .advanceWatermarkTo(base.plus(Duration.standardMinutes(1)))
+            .addElements(
+                TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+                TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+                TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
+            .advanceWatermarkTo(
+                base.plus(Duration.standardMinutes(1)).plus(Deduplicate.DEFAULT_DURATION))
+            .addElements(TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+            .advanceWatermarkToInfinity();
+
+    PCollection<String> distinctValues =
+        p.apply(values).apply(Deduplicate.<String>values().withTimeDomain(TimeDomain.EVENT_TIME));
+    PAssert.that(distinctValues)
+        .satisfies(
+            (Iterable<String> input) -> {
+              assertEquals(1, Iterables.frequency(input, "k1"));
+              assertEquals(1, Iterables.frequency(input, "k2"));
+              assertEquals(1, Iterables.frequency(input, "k3"));
+              assertTrue(
+                  Iterables.frequency(input, "maybedup") == 1
+                      || Iterables.frequency(input, "maybedup") == 2);
+              return null;
+            });
+    p.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTestStreamWithProcessingTime.class})
+  public void testProcessingTime() {
+    Instant base = new Instant(0);
+    TestStream<String> values =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(base)
+            .addElements(
+                TimestampedValue.of("k1", base),
+                TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))),
+                TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            .addElements(
+                TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
+                TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
+                TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
+            .advanceProcessingTime(Deduplicate.DEFAULT_DURATION)
+            .addElements(TimestampedValue.of("maybedup", base.plus(Duration.standardSeconds(59))))
+            .advanceWatermarkToInfinity();
+
+    PCollection<String> distinctValues = p.apply(values).apply(Deduplicate.values());
+    PAssert.that(distinctValues)
+        .satisfies(
+            (Iterable<String> input) -> {
+              assertEquals(1, Iterables.frequency(input, "k1"));
+              assertEquals(1, Iterables.frequency(input, "k2"));
+              assertEquals(1, Iterables.frequency(input, "k3"));
+              assertTrue(
+                  Iterables.frequency(input, "maybedup") == 1
+                      || Iterables.frequency(input, "maybedup") == 2);
+              return null;
+            });
+    p.run();
+  }
+
+  private static class Keys<T> implements SerializableFunction<KV<T, String>, T> {
+    @Override
+    public T apply(KV<T, String> input) {
+      return input.getKey();
+    }
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTestStreamWithProcessingTime.class})
+  public void testRepresentativeValuesWithCoder() {
+    Instant base = new Instant(0);
+    TestStream<KV<Integer, String>> values =
+        TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(base)
+            .addElements(
+                TimestampedValue.of(KV.of(1, "k1"), base),
+                TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            .addElements(
+                TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
+                TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
+                TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
+            .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, String>> distinctValues =
+        p.apply(values)
+            .apply(
+                Deduplicate.withRepresentativeValueFn(new Keys<Integer>())
+                    .withRepresentativeCoder(VarIntCoder.of()));
+
+    PAssert.that(distinctValues).containsInAnyOrder(KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
+    p.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTestStreamWithProcessingTime.class})
+  public void testTriggeredRepresentativeValuesWithType() {
+    Instant base = new Instant(0);
+    TestStream<KV<Integer, String>> values =
+        TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(base)
+            .addElements(
+                TimestampedValue.of(KV.of(1, "k1"), base),
+                TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
+                TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            .addElements(
+                TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
+                TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
+                TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
+            .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, String>> distinctValues =
+        p.apply(values)
+            .apply(
+                Deduplicate.withRepresentativeValueFn(new Keys<Integer>())
+                    .withRepresentativeCoder(VarIntCoder.of()));
+
+    PAssert.that(distinctValues).containsInAnyOrder(KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
+    p.run();
+  }
+
+  @Test
+  public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
+
+    Multimap<Integer, String> predupedContents = HashMultimap.create();
+    predupedContents.put(3, "foo");
+    predupedContents.put(4, "foos");
+    predupedContents.put(6, "barbaz");
+    predupedContents.put(6, "bazbar");
+    PCollection<String> dupes =
+        p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
+
+    assertThrows(
+        "Unable to return a default Coder for RemoveRepresentativeDupes",
+        IllegalStateException.class,
+        () ->
+            dupes.apply(
+                "RemoveRepresentativeDupes",
+                Deduplicate.withRepresentativeValueFn(String::length)));
+  }
+}