You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dm...@apache.org on 2019/03/08 14:42:34 UTC

[beam] branch master updated: [BEAM-6774] Euphoria - replace Distinct.mapped with Distinct.projected

This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e7c80  [BEAM-6774] Euphoria - replace Distinct.mapped with Distinct.projected
     new c230034  Merge pull request #7809: Euphoria: Fix distinct mapped return type
d4e7c80 is described below

commit d4e7c805c1ac80e14e2b65b89de6d31c291918a6
Author: Jan Lukavsky <je...@seznam.cz>
AuthorDate: Mon Feb 11 12:16:06 2019 +0100

    [BEAM-6774] Euphoria - replace Distinct.mapped with Distinct.projected
---
 .../euphoria/core/client/io/Collector.java         |   2 +-
 .../core/client/operator/AssignEventTime.java      |   6 +-
 .../euphoria/core/client/operator/CountByKey.java  |  12 +-
 .../euphoria/core/client/operator/Distinct.java    | 296 +++++++++++++++------
 .../euphoria/core/client/operator/Filter.java      |   6 +-
 .../euphoria/core/client/operator/FlatMap.java     |  14 +-
 .../euphoria/core/client/operator/FullJoin.java    |  18 +-
 .../euphoria/core/client/operator/Join.java        |  26 +-
 .../euphoria/core/client/operator/LeftJoin.java    |  18 +-
 .../euphoria/core/client/operator/MapElements.java |  14 +-
 .../euphoria/core/client/operator/ReduceByKey.java |  30 +--
 .../core/client/operator/ReduceWindow.java         |  26 +-
 .../euphoria/core/client/operator/RightJoin.java   |  18 +-
 .../euphoria/core/client/operator/SumByKey.java    |  12 +-
 .../euphoria/core/client/operator/TopPerKey.java   |  28 +-
 .../euphoria/core/client/operator/Union.java       |   6 +-
 .../core/client/operator/base/ShuffleOperator.java |   1 +
 .../translate/BroadcastHashJoinTranslator.java     |   4 +-
 .../euphoria/core/translate/OperatorTransform.java |   2 +-
 .../core/translate/ReduceByKeyTranslator.java      |   4 +-
 .../core/translate/TimestampExtractTransform.java  | 118 ++++++++
 .../provider/GenericTranslatorProvider.java        |   4 +-
 .../core/docs/DocumentationExamplesTest.java       |   6 +-
 .../euphoria/core/testkit/DistinctTest.java        | 112 ++++++--
 .../translate/TimestampExtractTransformTest.java   |  44 +++
 website/src/documentation/sdks/euphoria.md         |   4 +-
 26 files changed, 599 insertions(+), 232 deletions(-)

diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java
index f990ff4..d3f772a 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java
@@ -36,7 +36,7 @@ public interface Collector<T> extends Environment {
 
   /**
    * Returns {@link Context} view of the collector. Since {@link Collector} usually share the same
-   * methods as {@link Context} it can be safely casted.
+   * methods as {@link Context} it can be safely cast.
    *
    * @return this instance as a context class
    */
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java
index a660c05..bb65a1c 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java
@@ -110,9 +110,9 @@ public class AssignEventTime<InputT> extends Operator<InputT>
     @Override
     public <T> UsingBuilder<T> of(PCollection<T> input) {
       @SuppressWarnings("unchecked")
-      final Builder<T> casted = (Builder<T>) this;
-      casted.input = input;
-      return casted;
+      final Builder<T> cast = (Builder<T>) this;
+      cast.input = input;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java
index 3ba8ea2..01c17d5 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java
@@ -185,10 +185,10 @@ public class CountByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<K
     public <T> WindowByBuilder<T> keyBy(
         UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T> casted = (Builder<InputT, T>) this;
-      casted.keyExtractor = requireNonNull(keyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<InputT, T> cast = (Builder<InputT, T>) this;
+      cast.keyExtractor = requireNonNull(keyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
@@ -270,9 +270,9 @@ public class CountByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<K
             getWindow().isPresent(),
             builder -> {
               @SuppressWarnings("unchecked")
-              final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> casted =
+              final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> cast =
                   (ReduceByKey.WindowByInternalBuilder) builder;
-              return casted.windowBy(
+              return cast.windowBy(
                   getWindow()
                       .orElseThrow(
                           () ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
index 942e8f3..62c8d88 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
@@ -19,10 +19,13 @@ package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.Optional;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
 import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Recommended;
 import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
+import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
 import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
 import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
 import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
@@ -30,6 +33,7 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Shuffle
 import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
 import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
 import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
+import org.apache.beam.sdk.extensions.euphoria.core.translate.TimestampExtractTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
@@ -41,6 +45,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
 import org.joda.time.Duration;
 
 /**
@@ -51,7 +56,7 @@ import org.joda.time.Duration;
  * <ol>
  *   <li>{@code [named] ..................} give name to the operator [optional]
  *   <li>{@code of .......................} input dataset
- *   <li>{@code [mapped] .................} compare objects retrieved by this {@link UnaryFunction}
+ *   <li>{@code [projected] ..............} compare objects retrieved by this {@link UnaryFunction}
  *       instead of raw input elements
  *   <li>{@code [windowBy] ...............} windowing (see {@link WindowFn}), default is no
  *       windowing
@@ -68,8 +73,8 @@ import org.joda.time.Duration;
             + "(e.g. using bloom filters), which might reduce the space complexity",
     state = StateComplexity.CONSTANT,
     repartitions = 1)
-public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT, OutputT>
-    implements CompositeOperator<InputT, OutputT> {
+public class Distinct<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, InputT>
+    implements CompositeOperator<InputT, InputT> {
 
   /**
    * Starts building a nameless {@link Distinct} operator to process the given input dataset.
@@ -80,7 +85,7 @@ public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT,
    * @see #named(String)
    * @see OfBuilder#of(PCollection)
    */
-  public static <InputT> MappedBuilder<InputT, InputT> of(PCollection<InputT> input) {
+  public static <InputT> ProjectedBuilder<InputT, InputT> of(PCollection<InputT> input) {
     return named(null).of(input);
   }
 
@@ -98,11 +103,28 @@ public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT,
   public interface OfBuilder extends Builders.Of {
 
     @Override
-    <InputT> MappedBuilder<InputT, InputT> of(PCollection<InputT> input);
+    <InputT> ProjectedBuilder<InputT, InputT> of(PCollection<InputT> input);
   }
 
-  /** Builder for the 'mapped' step. */
-  public interface MappedBuilder<InputT, OutputT> extends WindowByBuilder<OutputT> {
+  /**
+   * A policy to be applied when multiple elements exist for the same comparison key. Note that this
+   * can be specified only when specifying the projection, because otherwise complete elements are
+   * compared and therefore it makes no sense to select between identical elements.
+   */
+  public enum SelectionPolicy {
+
+    /** Select any element (non deterministically). */
+    ANY,
+
+    /** Select element with lowest timestamp. */
+    OLDEST,
+
+    /** Select element with highest timestamp. */
+    NEWEST
+  }
+
+  /** Builder for the 'projected' step. */
+  public interface ProjectedBuilder<InputT, KeyT> extends WindowByBuilder<InputT, KeyT> {
 
     /**
      * Optionally specifies a function to transform the input elements into another type among which
@@ -111,184 +133,282 @@ public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT,
      * <p>This is, while windowing will be applied on basis of original input elements, the distinct
      * operator will be carried out on the transformed elements.
      *
-     * @param <T> the type of the transformed elements
-     * @param mapper a transform function applied to input element
+     * @param <KeyT> the type of the transformed elements
+     * @param transform a transform function applied to input element
      * @return the next builder to complete the setup of the {@link Distinct} operator
      */
-    default <T> WindowByBuilder<T> mapped(UnaryFunction<InputT, T> mapper) {
-      return mapped(mapper, null);
+    default <KeyT> WindowByBuilder<InputT, KeyT> projected(UnaryFunction<InputT, KeyT> transform) {
+      return projected(transform, SelectionPolicy.ANY, null);
+    }
+
+    default <KeyT> WindowByBuilder<InputT, KeyT> projected(
+        UnaryFunction<InputT, KeyT> transform, TypeDescriptor<KeyT> projectedType) {
+
+      return projected(transform, SelectionPolicy.ANY, requireNonNull(projectedType));
     }
 
-    <T> WindowByBuilder<T> mapped(
-        UnaryFunction<InputT, T> mapper, @Nullable TypeDescriptor<T> outputType);
+    default <KeyT> WindowByBuilder<InputT, KeyT> projected(
+        UnaryFunction<InputT, KeyT> transform, SelectionPolicy policy) {
+
+      return projected(transform, policy, null);
+    }
+
+    <KeyT> WindowByBuilder<InputT, KeyT> projected(
+        UnaryFunction<InputT, KeyT> transform,
+        SelectionPolicy policy,
+        @Nullable TypeDescriptor<KeyT> projectedType);
   }
 
   /** Builder for the 'windowBy' step. */
-  public interface WindowByBuilder<OutputT>
-      extends Builders.WindowBy<TriggerByBuilder<OutputT>>,
-          OptionalMethodBuilder<WindowByBuilder<OutputT>, Builders.Output<OutputT>>,
-          Builders.Output<OutputT> {
+  public interface WindowByBuilder<InputT, KeyT>
+      extends Builders.WindowBy<TriggerByBuilder<InputT>>,
+          OptionalMethodBuilder<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>>,
+          Builders.Output<InputT> {
 
     @Override
-    <T extends BoundedWindow> TriggerByBuilder<OutputT> windowBy(WindowFn<Object, T> windowing);
+    <W extends BoundedWindow> TriggerByBuilder<InputT> windowBy(WindowFn<Object, W> windowing);
 
     @Override
-    default Builders.Output<OutputT> applyIf(
-        boolean cond, UnaryFunction<WindowByBuilder<OutputT>, Builders.Output<OutputT>> fn) {
+    default Builders.Output<InputT> applyIf(
+        boolean cond, UnaryFunction<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>> fn) {
+
       return cond ? requireNonNull(fn).apply(this) : this;
     }
   }
 
   /** Builder for the 'triggeredBy' step. */
-  public interface TriggerByBuilder<OutputT>
-      extends Builders.TriggeredBy<AccumulationModeBuilder<OutputT>> {
+  public interface TriggerByBuilder<T> extends Builders.TriggeredBy<AccumulationModeBuilder<T>> {
 
     @Override
-    AccumulationModeBuilder<OutputT> triggeredBy(Trigger trigger);
+    AccumulationModeBuilder<T> triggeredBy(Trigger trigger);
   }
 
   /** Builder for the 'accumulationMode' step. */
-  public interface AccumulationModeBuilder<OutputT>
-      extends Builders.AccumulationMode<WindowedOutputBuilder<OutputT>> {
+  public interface AccumulationModeBuilder<T>
+      extends Builders.AccumulationMode<WindowedOutputBuilder<T>> {
 
     @Override
-    WindowedOutputBuilder<OutputT> accumulationMode(
-        WindowingStrategy.AccumulationMode accumulationMode);
+    WindowedOutputBuilder<T> accumulationMode(WindowingStrategy.AccumulationMode accumulationMode);
   }
 
   /** Builder for 'windowed output' step. */
-  public interface WindowedOutputBuilder<OutputT>
-      extends Builders.WindowedOutput<WindowedOutputBuilder<OutputT>>, Builders.Output<OutputT> {}
+  public interface WindowedOutputBuilder<T>
+      extends Builders.WindowedOutput<WindowedOutputBuilder<T>>, Builders.Output<T> {}
 
-  private static class Builder<InputT, OutputT>
+  private static class Builder<InputT, KeyT>
       implements OfBuilder,
-          MappedBuilder<InputT, OutputT>,
-          WindowByBuilder<OutputT>,
-          TriggerByBuilder<OutputT>,
-          AccumulationModeBuilder<OutputT>,
-          WindowedOutputBuilder<OutputT>,
-          Builders.Output<OutputT> {
+          ProjectedBuilder<InputT, KeyT>,
+          WindowByBuilder<InputT, KeyT>,
+          TriggerByBuilder<InputT>,
+          AccumulationModeBuilder<InputT>,
+          WindowedOutputBuilder<InputT>,
+          Builders.Output<InputT> {
 
     private final WindowBuilder<InputT> windowBuilder = new WindowBuilder<>();
 
     @Nullable private final String name;
     private PCollection<InputT> input;
-    @Nullable private UnaryFunction<InputT, OutputT> mapper;
-    @Nullable private TypeDescriptor<OutputT> outputType;
+
+    @SuppressWarnings("unchecked")
+    private UnaryFunction<InputT, KeyT> transform = (UnaryFunction) e -> e;
+
+    private SelectionPolicy policy = null;
+
+    @Nullable private TypeDescriptor<KeyT> projectedType;
+    @Nullable private TypeDescriptor<InputT> outputType;
+    private boolean projected = false;
 
     Builder(@Nullable String name) {
       this.name = name;
     }
 
     @Override
-    public <T> MappedBuilder<T, T> of(PCollection<T> input) {
+    public <T> ProjectedBuilder<T, T> of(PCollection<T> input) {
       @SuppressWarnings("unchecked")
-      final Builder<T, T> casted = (Builder) this;
-      casted.input = requireNonNull(input);
-      return casted;
+      final Builder<T, T> cast = (Builder) this;
+      cast.input = requireNonNull(input);
+      return cast;
     }
 
     @Override
-    public <T> WindowByBuilder<T> mapped(
-        UnaryFunction<InputT, T> mapper, @Nullable TypeDescriptor<T> outputType) {
+    public <K> WindowByBuilder<InputT, K> projected(
+        UnaryFunction<InputT, K> transform,
+        SelectionPolicy policy,
+        @Nullable TypeDescriptor<K> projectedType) {
+
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T> casted = (Builder) this;
-      casted.mapper = requireNonNull(mapper);
-      casted.outputType = outputType;
-      return casted;
+      final Builder<InputT, K> cast = (Builder) this;
+      cast.transform = requireNonNull(transform);
+      cast.policy = requireNonNull(policy);
+      cast.projectedType = projectedType;
+      cast.projected = true;
+      return cast;
     }
 
     @Override
-    public <T extends BoundedWindow> TriggerByBuilder<OutputT> windowBy(
-        WindowFn<Object, T> windowFn) {
+    public <W extends BoundedWindow> TriggerByBuilder<InputT> windowBy(
+        WindowFn<Object, W> windowFn) {
+
       windowBuilder.windowBy(windowFn);
       return this;
     }
 
     @Override
-    public AccumulationModeBuilder<OutputT> triggeredBy(Trigger trigger) {
+    public AccumulationModeBuilder<InputT> triggeredBy(Trigger trigger) {
       windowBuilder.triggeredBy(trigger);
       return this;
     }
 
     @Override
-    public WindowedOutputBuilder<OutputT> accumulationMode(
+    public WindowedOutputBuilder<InputT> accumulationMode(
         WindowingStrategy.AccumulationMode accumulationMode) {
       windowBuilder.accumulationMode(accumulationMode);
       return this;
     }
 
     @Override
-    public WindowedOutputBuilder<OutputT> withAllowedLateness(Duration allowedLateness) {
+    public WindowedOutputBuilder<InputT> withAllowedLateness(Duration allowedLateness) {
       windowBuilder.withAllowedLateness(allowedLateness);
       return this;
     }
 
     @Override
-    public WindowedOutputBuilder<OutputT> withAllowedLateness(
+    public WindowedOutputBuilder<InputT> withAllowedLateness(
         Duration allowedLateness, Window.ClosingBehavior closingBehavior) {
       windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
       return this;
     }
 
     @Override
-    public WindowedOutputBuilder<OutputT> withTimestampCombiner(
+    public WindowedOutputBuilder<InputT> withTimestampCombiner(
         TimestampCombiner timestampCombiner) {
       windowBuilder.withTimestampCombiner(timestampCombiner);
       return this;
     }
 
     @Override
-    public WindowedOutputBuilder<OutputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
+    public WindowedOutputBuilder<InputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
       windowBuilder.withOnTimeBehavior(behavior);
       return this;
     }
 
     @Override
     @SuppressWarnings("unchecked")
-    public PCollection<OutputT> output(OutputHint... outputHints) {
-      if (mapper == null) {
-        this.mapper = (UnaryFunction) UnaryFunction.identity();
+    public PCollection<InputT> output(OutputHint... outputHints) {
+      if (transform == null) {
+        this.transform = (UnaryFunction) UnaryFunction.identity();
       }
-      final Distinct<InputT, OutputT> distinct =
-          new Distinct<>(name, mapper, outputType, windowBuilder.getWindow().orElse(null));
+      final Distinct<InputT, KeyT> distinct =
+          new Distinct<>(
+              name,
+              transform,
+              outputType,
+              projectedType,
+              windowBuilder.getWindow().orElse(null),
+              policy,
+              projected);
       return OperatorTransform.apply(distinct, PCollectionList.of(input));
     }
   }
 
+  private final boolean projected;
+  private final @Nullable SelectionPolicy policy;
+
   private Distinct(
       @Nullable String name,
-      UnaryFunction<InputT, OutputT> mapper,
-      @Nullable TypeDescriptor<OutputT> outputType,
-      @Nullable Window<InputT> window) {
-    super(name, outputType, mapper, outputType, window);
+      UnaryFunction<InputT, KeyT> transform,
+      @Nullable TypeDescriptor<InputT> outputType,
+      @Nullable TypeDescriptor<KeyT> projectedType,
+      @Nullable Window<InputT> window,
+      @Nullable SelectionPolicy policy,
+      boolean projected) {
+
+    super(name, outputType, transform, projectedType, window);
+    this.projected = projected;
+    this.policy = policy;
+
+    Preconditions.checkState(
+        !projected || policy != null,
+        "Please specify selection policy when using projected distinct.");
   }
 
   @Override
-  public PCollection<OutputT> expand(PCollectionList<InputT> inputs) {
-    final PCollection<KV<OutputT, Void>> distinct =
-        ReduceByKey.named(getName().orElse(null))
-            .of(PCollectionLists.getOnlyElement(inputs))
-            .keyBy(getKeyExtractor())
-            .valueBy(e -> null, TypeDescriptors.nulls())
-            .combineBy(e -> null)
-            .applyIf(
-                getWindow().isPresent(),
-                builder -> {
-                  @SuppressWarnings("unchecked")
-                  final ReduceByKey.WindowByInternalBuilder<InputT, OutputT, Void> casted =
-                      (ReduceByKey.WindowByInternalBuilder) builder;
-                  return casted.windowBy(
-                      getWindow()
-                          .orElseThrow(
-                              () ->
-                                  new IllegalStateException(
-                                      "Unable to resolve windowing for Distinct expansion.")));
+  public PCollection<InputT> expand(PCollectionList<InputT> inputs) {
+    PCollection<InputT> tmp = PCollectionLists.getOnlyElement(inputs);
+    PCollection<InputT> input =
+        getWindow()
+            .map(
+                w -> {
+                  PCollection<InputT> ret = tmp.apply(w);
+                  ret.setTypeDescriptor(tmp.getTypeDescriptor());
+                  return ret;
                 })
-            .output();
-    return MapElements.named(getName().orElse("") + "::extract-keys")
-        .of(distinct)
-        .using(KV::getKey, getKeyType().orElse(null))
+            .orElse(tmp);
+    if (!projected) {
+      PCollection<KV<InputT, Void>> distinct =
+          ReduceByKey.named(getName().orElse(null))
+              .of(input)
+              .keyBy(e -> e, input.getTypeDescriptor())
+              .valueBy(e -> null, TypeDescriptors.nulls())
+              .combineBy(e -> null, TypeDescriptors.nulls())
+              .output();
+      return MapElements.named(getName().orElse("") + "::extract-keys")
+          .of(distinct)
+          .using(KV::getKey, input.getTypeDescriptor())
+          .output();
+    }
+    UnaryFunction<PCollection<InputT>, PCollection<InputT>> transformFn = getTransformFn();
+    return transformFn.apply(input);
+  }
+
+  private UnaryFunction<PCollection<InputT>, PCollection<InputT>> getTransformFn() {
+    switch (policy) {
+      case NEWEST:
+      case OLDEST:
+        String name = getName().orElse(null);
+        return input -> input.apply(TimestampExtractTransform.of(name, this::reduceTimestamped));
+      case ANY:
+        return this::reduceSelectingAny;
+      default:
+        throw new IllegalArgumentException("Unknown policy " + policy);
+    }
+  }
+
+  private PCollection<InputT> reduceSelectingAny(PCollection<InputT> input) {
+    return ReduceByKey.named(getName().orElse(null))
+        .of(input)
+        .keyBy(getKeyExtractor(), getKeyType().orElse(null))
+        .valueBy(e -> e, getOutputType().orElse(null))
+        .combineBy(values -> nonEmpty(values.findAny()), getOutputType().orElse(null))
+        .outputValues();
+  }
+
+  private PCollection<InputT> reduceTimestamped(PCollection<KV<Long, InputT>> input) {
+    CombinableReduceFunction<KV<Long, InputT>> select = getReduceFn();
+    PCollection<KV<Long, InputT>> outputValues =
+        ReduceByKey.named(getName().orElse(null))
+            .of(input)
+            .keyBy(e -> getKeyExtractor().apply(e.getValue()), getKeyType().orElse(null))
+            .valueBy(e -> e, requireNonNull(input.getTypeDescriptor()))
+            .combineBy(select, requireNonNull(input.getTypeDescriptor()))
+            .outputValues();
+    return MapElements.named(getName().map(n -> n + "::unwrap").orElse(null))
+        .of(outputValues)
+        .using(KV::getValue, getOutputType().orElse(null))
         .output();
   }
+
+  private CombinableReduceFunction<KV<Long, InputT>> getReduceFn() {
+    return policy == SelectionPolicy.NEWEST
+        ? values ->
+            nonEmpty(
+                values.collect(Collectors.maxBy((a, b) -> Long.compare(a.getKey(), b.getKey()))))
+        : values ->
+            nonEmpty(
+                values.collect(Collectors.minBy((a, b) -> Long.compare(a.getKey(), b.getKey()))));
+  }
+
+  private static <T> T nonEmpty(Optional<T> in) {
+    return in.orElseThrow(() -> new IllegalStateException("Empty reduce values?"));
+  }
 }
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java
index 8538dbb..33172a0 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java
@@ -109,9 +109,9 @@ public class Filter<InputT> extends Operator<InputT> implements CompositeOperato
     @Override
     public <T> ByBuilder<T> of(PCollection<T> input) {
       @SuppressWarnings("unchecked")
-      final Builder<T> casted = (Builder) this;
-      casted.input = requireNonNull(input);
-      return casted;
+      final Builder<T> cast = (Builder) this;
+      cast.input = requireNonNull(input);
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java
index b0d232e..dd3d675 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java
@@ -176,9 +176,9 @@ public class FlatMap<InputT, OutputT> extends Operator<OutputT>
     @Override
     public <InputLocalT> UsingBuilder<InputLocalT> of(PCollection<InputLocalT> input) {
       @SuppressWarnings("unchecked")
-      Builder<InputLocalT, ?> casted = (Builder) this;
-      casted.input = requireNonNull(input);
-      return casted;
+      Builder<InputLocalT, ?> cast = (Builder) this;
+      cast.input = requireNonNull(input);
+      return cast;
     }
 
     @Override
@@ -191,10 +191,10 @@ public class FlatMap<InputT, OutputT> extends Operator<OutputT>
     public <OutputLocalT> EventTimeBuilder<InputT, OutputLocalT> using(
         UnaryFunctor<InputT, OutputLocalT> functor, TypeDescriptor<OutputLocalT> outputType) {
       @SuppressWarnings("unchecked")
-      Builder<InputT, OutputLocalT> casted = (Builder) this;
-      casted.functor = requireNonNull(functor);
-      casted.outputType = outputType;
-      return casted;
+      Builder<InputT, OutputLocalT> cast = (Builder) this;
+      cast.functor = requireNonNull(functor);
+      cast.outputType = outputType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java
index 5b1c661..63a3755 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java
@@ -132,10 +132,10 @@ public class FullJoin {
     public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
         PCollection<FirstT> left, PCollection<SecondT> right) {
       @SuppressWarnings("unchecked")
-      final Builder<FirstT, SecondT, ?> casted = (Builder) this;
-      casted.left = requireNonNull(left);
-      casted.right = requireNonNull(right);
-      return casted;
+      final Builder<FirstT, SecondT, ?> cast = (Builder) this;
+      cast.left = requireNonNull(left);
+      cast.right = requireNonNull(right);
+      return cast;
     }
 
     @Override
@@ -144,11 +144,11 @@ public class FullJoin {
         UnaryFunction<RightT, T> rightKeyExtractor,
         @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<LeftT, RightT, T> casted = (Builder) this;
-      casted.leftKeyExtractor = requireNonNull(leftKeyExtractor);
-      casted.rightKeyExtractor = requireNonNull(rightKeyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<LeftT, RightT, T> cast = (Builder) this;
+      cast.leftKeyExtractor = requireNonNull(leftKeyExtractor);
+      cast.rightKeyExtractor = requireNonNull(rightKeyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
index e90d244..97563a7 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
@@ -202,10 +202,10 @@ public class Join<LeftT, RightT, KeyT, OutputT>
     public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
         PCollection<FirstT> left, PCollection<SecondT> right) {
       @SuppressWarnings("unchecked")
-      final Builder<FirstT, SecondT, ?, ?> casted = (Builder) this;
-      casted.left = requireNonNull(left);
-      casted.right = requireNonNull(right);
-      return casted;
+      final Builder<FirstT, SecondT, ?, ?> cast = (Builder) this;
+      cast.left = requireNonNull(left);
+      cast.right = requireNonNull(right);
+      return cast;
     }
 
     @Override
@@ -214,21 +214,21 @@ public class Join<LeftT, RightT, KeyT, OutputT>
         UnaryFunction<RightT, T> rightKeyExtractor,
         @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<LeftT, RightT, T, ?> casted = (Builder) this;
-      casted.leftKeyExtractor = leftKeyExtractor;
-      casted.rightKeyExtractor = rightKeyExtractor;
-      casted.keyType = keyType;
-      return casted;
+      final Builder<LeftT, RightT, T, ?> cast = (Builder) this;
+      cast.leftKeyExtractor = leftKeyExtractor;
+      cast.rightKeyExtractor = rightKeyExtractor;
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
     public <T> WindowByBuilder<KeyT, T> using(
         BinaryFunctor<LeftT, RightT, T> joinFunc, @Nullable TypeDescriptor<T> outputType) {
       @SuppressWarnings("unchecked")
-      final Builder<LeftT, RightT, KeyT, T> casted = (Builder) this;
-      casted.joinFunc = requireNonNull(joinFunc);
-      casted.outputType = outputType;
-      return casted;
+      final Builder<LeftT, RightT, KeyT, T> cast = (Builder) this;
+      cast.joinFunc = requireNonNull(joinFunc);
+      cast.outputType = outputType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java
index c21a14b..fd620ab 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java
@@ -131,10 +131,10 @@ public class LeftJoin {
     public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
         PCollection<FirstT> left, PCollection<SecondT> right) {
       @SuppressWarnings("unchecked")
-      final Builder<FirstT, SecondT, ?> casted = (Builder) this;
-      casted.left = requireNonNull(left);
-      casted.right = requireNonNull(right);
-      return casted;
+      final Builder<FirstT, SecondT, ?> cast = (Builder) this;
+      cast.left = requireNonNull(left);
+      cast.right = requireNonNull(right);
+      return cast;
     }
 
     @Override
@@ -143,11 +143,11 @@ public class LeftJoin {
         UnaryFunction<RightT, T> rightKeyExtractor,
         @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<LeftT, RightT, T> casted = (Builder) this;
-      casted.leftKeyExtractor = requireNonNull(leftKeyExtractor);
-      casted.rightKeyExtractor = requireNonNull(rightKeyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<LeftT, RightT, T> cast = (Builder) this;
+      cast.leftKeyExtractor = requireNonNull(leftKeyExtractor);
+      cast.rightKeyExtractor = requireNonNull(rightKeyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java
index 29b393a..522cd73 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java
@@ -134,19 +134,19 @@ public class MapElements<InputT, OutputT> extends Operator<OutputT>
     @Override
     public <T> UsingBuilder<T> of(PCollection<T> input) {
       @SuppressWarnings("unchecked")
-      final Builder<T, ?> casted = (Builder) this;
-      casted.input = input;
-      return casted;
+      final Builder<T, ?> cast = (Builder) this;
+      cast.input = input;
+      return cast;
     }
 
     @Override
     public <T> Builders.Output<T> using(
         UnaryFunctionEnv<InputT, T> mapper, @Nullable TypeDescriptor<T> outputType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T> casted = (Builder) this;
-      casted.mapper = mapper;
-      casted.outputType = outputType;
-      return casted;
+      final Builder<InputT, T> cast = (Builder) this;
+      cast.mapper = mapper;
+      cast.outputType = outputType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
index 6b1eb42..67769c9 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
@@ -335,29 +335,29 @@ public class ReduceByKey<InputT, KeyT, ValueT, OutputT>
     @SuppressWarnings("unchecked")
     public <T> KeyByBuilder<T> of(PCollection<T> input) {
       @SuppressWarnings("unchecked")
-      final Builder<T, ?, ?, ?> casted = (Builder) this;
-      casted.input = input;
-      return casted;
+      final Builder<T, ?, ?, ?> cast = (Builder) this;
+      cast.input = input;
+      return cast;
     }
 
     @Override
     public <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(
         UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T, InputT, ?> casted = (Builder) this;
-      casted.keyExtractor = requireNonNull(keyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<InputT, T, InputT, ?> cast = (Builder) this;
+      cast.keyExtractor = requireNonNull(keyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
     public <T> ReduceByBuilder<KeyT, T> valueBy(
         UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, KeyT, T, ?> casted = (Builder) this;
-      casted.valueExtractor = requireNonNull(valueExtractor);
-      casted.valueType = valueType;
-      return casted;
+      final Builder<InputT, KeyT, T, ?> cast = (Builder) this;
+      cast.valueExtractor = requireNonNull(valueExtractor);
+      cast.valueType = valueType;
+      return cast;
     }
 
     @Override
@@ -369,10 +369,10 @@ public class ReduceByKey<InputT, KeyT, ValueT, OutputT>
         valueExtractor = (UnaryFunction) UnaryFunction.identity();
       }
       @SuppressWarnings("unchecked")
-      final Builder<InputT, KeyT, ValueT, T> casted = (Builder) this;
-      casted.reducer = requireNonNull(reducer);
-      casted.outputType = outputType;
-      return casted;
+      final Builder<InputT, KeyT, ValueT, T> cast = (Builder) this;
+      cast.reducer = requireNonNull(reducer);
+      cast.outputType = outputType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java
index dd0eb30..4c26518 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java
@@ -276,19 +276,19 @@ public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<Input
     @Override
     public <T> ValueByReduceByBuilder<T, T> of(PCollection<T> input) {
       @SuppressWarnings("unchecked")
-      final Builder<T, T, ?> casted = (Builder) this;
-      casted.input = requireNonNull(input);
-      return casted;
+      final Builder<T, T, ?> cast = (Builder) this;
+      cast.input = requireNonNull(input);
+      return cast;
     }
 
     @Override
     public <T> ReduceByBuilder<T> valueBy(
         UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T, ?> casted = (Builder) this;
-      casted.valueExtractor = requireNonNull(valueExtractor);
-      casted.valueType = valueType;
-      return casted;
+      final Builder<InputT, T, ?> cast = (Builder) this;
+      cast.valueExtractor = requireNonNull(valueExtractor);
+      cast.valueType = valueType;
+      return cast;
     }
 
     @Override
@@ -300,10 +300,10 @@ public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<Input
         valueExtractor = (UnaryFunction) UnaryFunction.identity();
       }
       @SuppressWarnings("unchecked")
-      final Builder<InputT, ValueT, T> casted = (Builder) this;
-      casted.reducer = requireNonNull(reducer);
-      casted.outputType = outputType;
-      return casted;
+      final Builder<InputT, ValueT, T> cast = (Builder) this;
+      cast.reducer = requireNonNull(reducer);
+      cast.outputType = outputType;
+      return cast;
     }
 
     @Override
@@ -439,9 +439,9 @@ public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<Input
             getWindow().isPresent(),
             builder -> {
               @SuppressWarnings("unchecked")
-              final ReduceByKey.WindowByInternalBuilder<InputT, Byte, OutputT> casted =
+              final ReduceByKey.WindowByInternalBuilder<InputT, Byte, OutputT> cast =
                   (ReduceByKey.WindowByInternalBuilder) builder;
-              return casted.windowBy(
+              return cast.windowBy(
                   getWindow()
                       .orElseThrow(
                           () ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java
index 3a3c0b4..55c292e 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java
@@ -132,10 +132,10 @@ public class RightJoin {
     public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
         PCollection<FirstT> left, PCollection<SecondT> right) {
       @SuppressWarnings("unchecked")
-      final Builder<FirstT, SecondT, ?> casted = (Builder) this;
-      casted.left = requireNonNull(left);
-      casted.right = requireNonNull(right);
-      return casted;
+      final Builder<FirstT, SecondT, ?> cast = (Builder) this;
+      cast.left = requireNonNull(left);
+      cast.right = requireNonNull(right);
+      return cast;
     }
 
     @Override
@@ -144,11 +144,11 @@ public class RightJoin {
         UnaryFunction<RightT, T> rightKeyExtractor,
         @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<LeftT, RightT, T> casted = (Builder) this;
-      casted.leftKeyExtractor = requireNonNull(leftKeyExtractor);
-      casted.rightKeyExtractor = requireNonNull(rightKeyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<LeftT, RightT, T> cast = (Builder) this;
+      cast.leftKeyExtractor = requireNonNull(leftKeyExtractor);
+      cast.rightKeyExtractor = requireNonNull(rightKeyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java
index 203fcc5..8f89f35 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java
@@ -206,10 +206,10 @@ public class SumByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<Key
     public <T> ValueByBuilder<InputT, T> keyBy(
         UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T> casted = (Builder<InputT, T>) this;
-      casted.keyExtractor = requireNonNull(keyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<InputT, T> cast = (Builder<InputT, T>) this;
+      cast.keyExtractor = requireNonNull(keyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
@@ -309,9 +309,9 @@ public class SumByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<Key
             getWindow().isPresent(),
             builder -> {
               @SuppressWarnings("unchecked")
-              final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> casted =
+              final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> cast =
                   (ReduceByKey.WindowByInternalBuilder) builder;
-              return casted.windowBy(
+              return cast.windowBy(
                   getWindow()
                       .orElseThrow(
                           () ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java
index e69f5d7..7ca3ce9 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java
@@ -242,30 +242,30 @@ public class TopPerKey<InputT, KeyT, ValueT, ScoreT extends Comparable<ScoreT>>
     public <T> ValueByBuilder<InputT, T> keyBy(
         UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, T, ?, ?> casted = (Builder) this;
-      casted.keyExtractor = requireNonNull(keyExtractor);
-      casted.keyType = keyType;
-      return casted;
+      final Builder<InputT, T, ?, ?> cast = (Builder) this;
+      cast.keyExtractor = requireNonNull(keyExtractor);
+      cast.keyType = keyType;
+      return cast;
     }
 
     @Override
     public <T> ScoreBy<InputT, KeyT, T> valueBy(
         UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, KeyT, T, ?> casted = (Builder) this;
-      casted.valueExtractor = requireNonNull(valueExtractor);
-      casted.valueType = valueType;
-      return casted;
+      final Builder<InputT, KeyT, T, ?> cast = (Builder) this;
+      cast.valueExtractor = requireNonNull(valueExtractor);
+      cast.valueType = valueType;
+      return cast;
     }
 
     @Override
     public <T extends Comparable<T>> WindowByBuilder<KeyT, ValueT, T> scoreBy(
         UnaryFunction<InputT, T> scoreExtractor, @Nullable TypeDescriptor<T> scoreType) {
       @SuppressWarnings("unchecked")
-      final Builder<InputT, KeyT, ValueT, T> casted = (Builder) this;
-      casted.scoreExtractor = requireNonNull(scoreExtractor);
-      casted.scoreType = scoreType;
-      return casted;
+      final Builder<InputT, KeyT, ValueT, T> cast = (Builder) this;
+      cast.scoreExtractor = requireNonNull(scoreExtractor);
+      cast.scoreType = scoreType;
+      return cast;
     }
 
     @Override
@@ -399,8 +399,8 @@ public class TopPerKey<InputT, KeyT, ValueT, ScoreT extends Comparable<ScoreT>>
             builder -> {
               @SuppressWarnings("unchecked")
               final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Triple<KeyT, ValueT, ScoreT>>
-                  casted = (ReduceByKey.WindowByInternalBuilder) builder;
-              return casted.windowBy(
+                  cast = (ReduceByKey.WindowByInternalBuilder) builder;
+              return cast.windowBy(
                   getWindow()
                       .orElseThrow(
                           () ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java
index dc186f5..9a48bca 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java
@@ -143,9 +143,9 @@ public class Union<InputT> extends Operator<InputT> {
     @Override
     public <T> Builders.Output<T> of(List<PCollection<T>> pCollections) {
       @SuppressWarnings("unchecked")
-      final Builder<T> casted = (Builder) this;
-      casted.pCollections = pCollections;
-      return casted;
+      final Builder<T> cast = (Builder) this;
+      cast.pCollections = pCollections;
+      return cast;
     }
 
     @Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java
index d8896de..99b5f07 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java
@@ -40,6 +40,7 @@ public abstract class ShuffleOperator<InputT, KeyT, OutputT> extends Operator<Ou
       UnaryFunction<InputT, KeyT> keyExtractor,
       @Nullable TypeDescriptor<KeyT> keyType,
       @Nullable Window<InputT> windowing) {
+
     super(name, outputType);
     this.keyExtractor = keyExtractor;
     this.keyType = keyType;
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java
index c6ac8bf..501d22b 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java
@@ -44,8 +44,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Table;
  * to follow to avoid data to be send to executors repeatedly:
  *
  * <ul>
- *   <li>Input {@link PCollection} of broadcasted side has to be the same instance
- *   <li>Key extractor of broadcasted side has to be the same {@link UnaryFunction} instance
+ *   <li>Input {@link PCollection} of broadcast side has to be the same instance
+ *   <li>Key extractor of broadcast side has to be the same {@link UnaryFunction} instance
  * </ul>
  */
 public class BroadcastHashJoinTranslator<LeftT, RightT, KeyT, OutputT>
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java
index 675cfac..54d07fa 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java
@@ -56,7 +56,7 @@ public class OperatorTransform<InputT, OutputT, OperatorT extends Operator<Outpu
             + operator.getClass()
             + "] with name ["
             + operator.getName().orElse(null)
-            + ".");
+            + "]");
   }
 
   private final OperatorT operator;
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java
index 930ed14..64d5d78 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java
@@ -86,8 +86,8 @@ public class ReduceByKeyTranslator<InputT, KeyT, ValueT, OutputT>
               "combine",
               Combine.perKey(asCombiner(reducer, accumulators, operator.getName().orElse(null))));
       @SuppressWarnings("unchecked")
-      final PCollection<KV<KeyT, OutputT>> casted = (PCollection) combined;
-      return casted.setTypeDescriptor(
+      final PCollection<KV<KeyT, OutputT>> cast = (PCollection) combined;
+      return cast.setTypeDescriptor(
           operator
               .getOutputType()
               .orElseThrow(
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
new file mode 100644
index 0000000..bc82ecb
--- /dev/null
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/**
+ * A transform for extracting stamp and applying a user-defined transform on the extracted
+ * collection.
+ *
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+public class TimestampExtractTransform<InputT, OutputT>
+    extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+
+  /**
+   * Create transformation working on KVs with timestamps as key.
+   *
+   * @param <InputT> type of input elements
+   * @param <OutputT> type of output elements
+   * @param transform the transform applied on elements with timestamp
+   * @return the transform
+   */
+  public static <InputT, OutputT> TimestampExtractTransform<InputT, OutputT> of(
+      PCollectionTransform<InputT, OutputT> transform) {
+
+    return new TimestampExtractTransform<>(null, transform);
+  }
+
+  /**
+   * Create transformation working on KVs with timestamps as key.
+   *
+   * @param <InputT> type of input elements
+   * @param <OutputT> type of output elements
+   * @param name name of the transform
+   * @param transform the transform applied on elements with timestamp
+   * @return the transform
+   */
+  public static <InputT, OutputT> TimestampExtractTransform<InputT, OutputT> of(
+      String name, PCollectionTransform<InputT, OutputT> transform) {
+
+    return new TimestampExtractTransform<>(name, transform);
+  }
+
+  /**
+   * Apply user defined transform(s) to input {@link PCollection} and return output one.
+   *
+   * @param <InputT> input type
+   * @param <OutputT> output type
+   */
+  @FunctionalInterface
+  public interface PCollectionTransform<InputT, OutputT>
+      extends UnaryFunction<PCollection<KV<Long, InputT>>, PCollection<OutputT>> {}
+
+  private static class Wrap<T> extends DoFn<T, KV<Long, T>> {
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(KV.of(ctx.timestamp().getMillis(), ctx.element()));
+    }
+  }
+
+  private static class Unwrap<T> extends DoFn<KV<Long, T>, T> {
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().getValue());
+    }
+  }
+
+  private final PCollectionTransform<InputT, OutputT> timestampedTransform;
+
+  private TimestampExtractTransform(
+      @Nullable String name, PCollectionTransform<InputT, OutputT> timestampedTransform) {
+
+    super(name);
+    this.timestampedTransform = timestampedTransform;
+  }
+
+  @Override
+  public PCollection<OutputT> expand(PCollection<InputT> input) {
+    PCollection<KV<Long, InputT>> in;
+    in = input.apply(getName("wrap"), ParDo.of(new Wrap<>()));
+    if (input.getTypeDescriptor() != null) {
+      in =
+          in.setTypeDescriptor(
+              TypeDescriptors.kvs(TypeDescriptors.longs(), input.getTypeDescriptor()));
+    }
+    return timestampedTransform.apply(in);
+  }
+
+  private String getName(String suffix) {
+    return name != null ? name + "::" + suffix : suffix;
+  }
+}
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
index 210f4e2..f9ea163 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
@@ -164,12 +164,12 @@ public class GenericTranslatorProvider implements TranslatorProvider {
           "At least user defined predicate or class of an operator have to be given.");
 
       @SuppressWarnings("unchecked")
-      OperatorTranslator<?, ?, OperatorT> castedTranslator =
+      OperatorTranslator<?, ?, OperatorT> castTranslator =
           (OperatorTranslator<?, ?, OperatorT>) translator;
 
       this.operatorClass = operatorClass;
       this.userDefinedPredicate = userDefinedPredicate;
-      this.translator = castedTranslator;
+      this.translator = castTranslator;
     }
 
     static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
index 4b2eb8c..b078572 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
@@ -281,9 +281,11 @@ public class DocumentationExamplesTest {
                 KV.of(1, 100L), KV.of(3, 100_000L), KV.of(42, 10L), KV.of(1, 0L), KV.of(3, 0L)));
 
     // suppose input: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
-    PCollection<Integer> uniqueKeys =
-        Distinct.named("unique-keys-only").of(keyValueInput).mapped(KV::getKey).output();
+    PCollection<KV<Integer, Long>> distinct =
+        Distinct.named("unique-keys-only").of(keyValueInput).projected(KV::getKey).output();
+
     // Output will contain:  1, 3, 42
+    PCollection<Integer> uniqueKeys = MapElements.of(distinct).using(KV::getKey).output();
 
     PAssert.that(uniqueKeys).containsInAnyOrder(1, 3, 42);
 
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java
index e489d37..a81bc77 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
 import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.values.KV;
@@ -75,12 +76,14 @@ public class DistinctTest extends AbstractOperatorTest {
           @Override
           protected PCollection<Integer> getOutput(PCollection<KV<Integer, Long>> input) {
             input = AssignEventTime.of(input).using(KV::getValue).output();
-            return Distinct.of(input)
-                .mapped(KV::getKey)
-                .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
-                .triggeredBy(DefaultTrigger.of())
-                .discardingFiredPanes()
-                .output();
+            PCollection<KV<Integer, Long>> distinct =
+                Distinct.of(input)
+                    .projected(KV::getKey)
+                    .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+                    .triggeredBy(DefaultTrigger.of())
+                    .discardingFiredPanes()
+                    .output();
+            return MapElements.of(distinct).using(KV::getKey).output();
           }
 
           @Override
@@ -114,12 +117,14 @@ public class DistinctTest extends AbstractOperatorTest {
           @Override
           protected PCollection<Integer> getOutput(PCollection<KV<Integer, Long>> input) {
             input = AssignEventTime.of(input).using(KV::getValue).output();
-            return Distinct.of(input)
-                .mapped(KV::getKey)
-                .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
-                .triggeredBy(DefaultTrigger.of())
-                .discardingFiredPanes()
-                .output();
+            PCollection<KV<Integer, Long>> distinct =
+                Distinct.of(input)
+                    .projected(KV::getKey)
+                    .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+                    .triggeredBy(DefaultTrigger.of())
+                    .discardingFiredPanes()
+                    .output();
+            return MapElements.of(distinct).using(KV::getKey).output();
           }
 
           @Override
@@ -136,10 +141,87 @@ public class DistinctTest extends AbstractOperatorTest {
         });
   }
 
-  private List<KV<Integer, Long>> asTimedList(long step, Integer... values) {
-    final List<KV<Integer, Long>> ret = new ArrayList<>(values.length);
+  @Test
+  public void testSimpleDuplicatesWithStreamStrategyOldest() {
+    execute(
+        new AbstractTestCase<KV<String, Long>, String>() {
+
+          @Override
+          public List<String> getUnorderedOutput() {
+            return Arrays.asList("2", "1", "3");
+          }
+
+          @Override
+          protected PCollection<String> getOutput(PCollection<KV<String, Long>> input) {
+            input = AssignEventTime.of(input).using(KV::getValue).output();
+            PCollection<KV<String, Long>> distinct =
+                Distinct.of(input)
+                    .projected(
+                        in -> in.getKey().substring(0, 1),
+                        Distinct.SelectionPolicy.OLDEST,
+                        TypeDescriptors.strings())
+                    .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+                    .triggeredBy(DefaultTrigger.of())
+                    .discardingFiredPanes()
+                    .output();
+            return MapElements.of(distinct).using(KV::getKey).output();
+          }
+
+          @Override
+          protected List<KV<String, Long>> getInput() {
+            return asTimedList(100, "1", "2", "3", "3.", "2.", "1.");
+          }
+
+          @Override
+          protected TypeDescriptor<KV<String, Long>> getInputType() {
+            return TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs());
+          }
+        });
+  }
+
+  @Test
+  public void testSimpleDuplicatesWithStreamStrategyNewest() {
+    execute(
+        new AbstractTestCase<KV<String, Long>, String>() {
+
+          @Override
+          public List<String> getUnorderedOutput() {
+            return Arrays.asList("2.", "1.", "3.");
+          }
+
+          @Override
+          protected PCollection<String> getOutput(PCollection<KV<String, Long>> input) {
+            input = AssignEventTime.of(input).using(KV::getValue).output();
+            PCollection<KV<String, Long>> distinct =
+                Distinct.of(input)
+                    .projected(
+                        in -> in.getKey().substring(0, 1),
+                        Distinct.SelectionPolicy.NEWEST,
+                        TypeDescriptors.strings())
+                    .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+                    .triggeredBy(DefaultTrigger.of())
+                    .discardingFiredPanes()
+                    .output();
+            return MapElements.of(distinct).using(KV::getKey).output();
+          }
+
+          @Override
+          protected List<KV<String, Long>> getInput() {
+            return asTimedList(100, "1", "2", "3", "3.", "2.", "1.");
+          }
+
+          @Override
+          protected TypeDescriptor<KV<String, Long>> getInputType() {
+            return TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs());
+          }
+        });
+  }
+
+  @SafeVarargs
+  final <T> List<KV<T, Long>> asTimedList(long step, T... values) {
+    final List<KV<T, Long>> ret = new ArrayList<>(values.length);
     long i = step;
-    for (Integer v : values) {
+    for (T v : values) {
       ret.add(KV.of(v, i));
       i += step;
     }
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransformTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransformTest.java
new file mode 100644
index 0000000..2d37ae3
--- /dev/null
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransformTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+
+/** Test suite for {@link TimetampExtractTransform}. */
+public class TimestampExtractTransformTest {
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 10000)
+  public void testTransform() {
+    Pipeline p = Pipeline.create();
+    PCollection<Integer> input = p.apply(Create.of(1, 2, 3));
+    PCollection<KV<Integer, Long>> result =
+        input.apply(
+            TimestampExtractTransform.of(
+                in -> CountByKey.of(in).keyBy(KV::getValue, TypeDescriptors.integers()).output()));
+    PAssert.that(result).containsInAnyOrder(KV.of(1, 1L), KV.of(2, 1L), KV.of(3, 1L));
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/website/src/documentation/sdks/euphoria.md b/website/src/documentation/sdks/euphoria.md
index 45d09ea..652708b 100644
--- a/website/src/documentation/sdks/euphoria.md
+++ b/website/src/documentation/sdks/euphoria.md
@@ -266,9 +266,9 @@ Distinct.named("unique-integers-only")
 // suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
 Distinct.named("unique-keys-only")
   .of(keyValueInput)
-  .mapped(KV::getKey)
+  .projected(KV::getKey)
   .output();
-// Output will contain:  1, 3, 42
+// Output will contain kvs with keys:  1, 3, 42 with some arbitrary values associated with given keys
 ```
 
 ### `Join`