You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dm...@apache.org on 2019/03/08 14:42:34 UTC
[beam] branch master updated: [BEAM-6774] Euphoria - replace
Distinct.mapped with Distinct.projected
This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d4e7c80 [BEAM-6774] Euphoria - replace Distinct.mapped with Distinct.projected
new c230034 Merge pull request #7809: Euphoria: Fix distinct mapped return type
d4e7c80 is described below
commit d4e7c805c1ac80e14e2b65b89de6d31c291918a6
Author: Jan Lukavsky <je...@seznam.cz>
AuthorDate: Mon Feb 11 12:16:06 2019 +0100
[BEAM-6774] Euphoria - replace Distinct.mapped with Distinct.projected
---
.../euphoria/core/client/io/Collector.java | 2 +-
.../core/client/operator/AssignEventTime.java | 6 +-
.../euphoria/core/client/operator/CountByKey.java | 12 +-
.../euphoria/core/client/operator/Distinct.java | 296 +++++++++++++++------
.../euphoria/core/client/operator/Filter.java | 6 +-
.../euphoria/core/client/operator/FlatMap.java | 14 +-
.../euphoria/core/client/operator/FullJoin.java | 18 +-
.../euphoria/core/client/operator/Join.java | 26 +-
.../euphoria/core/client/operator/LeftJoin.java | 18 +-
.../euphoria/core/client/operator/MapElements.java | 14 +-
.../euphoria/core/client/operator/ReduceByKey.java | 30 +--
.../core/client/operator/ReduceWindow.java | 26 +-
.../euphoria/core/client/operator/RightJoin.java | 18 +-
.../euphoria/core/client/operator/SumByKey.java | 12 +-
.../euphoria/core/client/operator/TopPerKey.java | 28 +-
.../euphoria/core/client/operator/Union.java | 6 +-
.../core/client/operator/base/ShuffleOperator.java | 1 +
.../translate/BroadcastHashJoinTranslator.java | 4 +-
.../euphoria/core/translate/OperatorTransform.java | 2 +-
.../core/translate/ReduceByKeyTranslator.java | 4 +-
.../core/translate/TimestampExtractTransform.java | 118 ++++++++
.../provider/GenericTranslatorProvider.java | 4 +-
.../core/docs/DocumentationExamplesTest.java | 6 +-
.../euphoria/core/testkit/DistinctTest.java | 112 ++++++--
.../translate/TimestampExtractTransformTest.java | 44 +++
website/src/documentation/sdks/euphoria.md | 4 +-
26 files changed, 599 insertions(+), 232 deletions(-)
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java
index f990ff4..d3f772a 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/io/Collector.java
@@ -36,7 +36,7 @@ public interface Collector<T> extends Environment {
/**
* Returns {@link Context} view of the collector. Since {@link Collector} usually share the same
- * methods as {@link Context} it can be safely casted.
+ * methods as {@link Context} it can be safely cast.
*
* @return this instance as a context class
*/
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java
index a660c05..bb65a1c 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/AssignEventTime.java
@@ -110,9 +110,9 @@ public class AssignEventTime<InputT> extends Operator<InputT>
@Override
public <T> UsingBuilder<T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
- final Builder<T> casted = (Builder<T>) this;
- casted.input = input;
- return casted;
+ final Builder<T> cast = (Builder<T>) this;
+ cast.input = input;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java
index 3ba8ea2..01c17d5 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java
@@ -185,10 +185,10 @@ public class CountByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<K
public <T> WindowByBuilder<T> keyBy(
UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, T> casted = (Builder<InputT, T>) this;
- casted.keyExtractor = requireNonNull(keyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<InputT, T> cast = (Builder<InputT, T>) this;
+ cast.keyExtractor = requireNonNull(keyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
@@ -270,9 +270,9 @@ public class CountByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<K
getWindow().isPresent(),
builder -> {
@SuppressWarnings("unchecked")
- final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> casted =
+ final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> cast =
(ReduceByKey.WindowByInternalBuilder) builder;
- return casted.windowBy(
+ return cast.windowBy(
getWindow()
.orElseThrow(
() ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
index 942e8f3..62c8d88 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
@@ -19,10 +19,13 @@ package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
import static java.util.Objects.requireNonNull;
+import java.util.Optional;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Recommended;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
+import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
@@ -30,6 +33,7 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Shuffle
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
+import org.apache.beam.sdk.extensions.euphoria.core.translate.TimestampExtractTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
@@ -41,6 +45,7 @@ import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.joda.time.Duration;
/**
@@ -51,7 +56,7 @@ import org.joda.time.Duration;
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
- * <li>{@code [mapped] .................} compare objects retrieved by this {@link UnaryFunction}
+ * <li>{@code [projected] ..............} compare objects retrieved by this {@link UnaryFunction}
* instead of raw input elements
* <li>{@code [windowBy] ...............} windowing (see {@link WindowFn}), default is no
* windowing
@@ -68,8 +73,8 @@ import org.joda.time.Duration;
+ "(e.g. using bloom filters), which might reduce the space complexity",
state = StateComplexity.CONSTANT,
repartitions = 1)
-public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT, OutputT>
- implements CompositeOperator<InputT, OutputT> {
+public class Distinct<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, InputT>
+ implements CompositeOperator<InputT, InputT> {
/**
* Starts building a nameless {@link Distinct} operator to process the given input dataset.
@@ -80,7 +85,7 @@ public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT,
* @see #named(String)
* @see OfBuilder#of(PCollection)
*/
- public static <InputT> MappedBuilder<InputT, InputT> of(PCollection<InputT> input) {
+ public static <InputT> ProjectedBuilder<InputT, InputT> of(PCollection<InputT> input) {
return named(null).of(input);
}
@@ -98,11 +103,28 @@ public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT,
public interface OfBuilder extends Builders.Of {
@Override
- <InputT> MappedBuilder<InputT, InputT> of(PCollection<InputT> input);
+ <InputT> ProjectedBuilder<InputT, InputT> of(PCollection<InputT> input);
}
- /** Builder for the 'mapped' step. */
- public interface MappedBuilder<InputT, OutputT> extends WindowByBuilder<OutputT> {
+ /**
+ * A policy to be applied when multiple elements exist for the same comparison key. Note that this
+ * can be specified only when specifying the projection, because otherwise complete elements are
+ * compared and therefore it makes no sense to select between identical elements.
+ */
+ public enum SelectionPolicy {
+
+ /** Select any element (non deterministically). */
+ ANY,
+
+ /** Select element with lowest timestamp. */
+ OLDEST,
+
+ /** Select element with highest timestamp. */
+ NEWEST
+ }
+
+ /** Builder for the 'projected' step. */
+ public interface ProjectedBuilder<InputT, KeyT> extends WindowByBuilder<InputT, KeyT> {
/**
* Optionally specifies a function to transform the input elements into another type among which
@@ -111,184 +133,282 @@ public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT,
* <p>This is, while windowing will be applied on basis of original input elements, the distinct
* operator will be carried out on the transformed elements.
*
- * @param <T> the type of the transformed elements
- * @param mapper a transform function applied to input element
+ * @param <KeyT> the type of the transformed elements
+ * @param transform a transform function applied to input element
* @return the next builder to complete the setup of the {@link Distinct} operator
*/
- default <T> WindowByBuilder<T> mapped(UnaryFunction<InputT, T> mapper) {
- return mapped(mapper, null);
+ default <KeyT> WindowByBuilder<InputT, KeyT> projected(UnaryFunction<InputT, KeyT> transform) {
+ return projected(transform, SelectionPolicy.ANY, null);
+ }
+
+ default <KeyT> WindowByBuilder<InputT, KeyT> projected(
+ UnaryFunction<InputT, KeyT> transform, TypeDescriptor<KeyT> projectedType) {
+
+ return projected(transform, SelectionPolicy.ANY, requireNonNull(projectedType));
}
- <T> WindowByBuilder<T> mapped(
- UnaryFunction<InputT, T> mapper, @Nullable TypeDescriptor<T> outputType);
+ default <KeyT> WindowByBuilder<InputT, KeyT> projected(
+ UnaryFunction<InputT, KeyT> transform, SelectionPolicy policy) {
+
+ return projected(transform, policy, null);
+ }
+
+ <KeyT> WindowByBuilder<InputT, KeyT> projected(
+ UnaryFunction<InputT, KeyT> transform,
+ SelectionPolicy policy,
+ @Nullable TypeDescriptor<KeyT> projectedType);
}
/** Builder for the 'windowBy' step. */
- public interface WindowByBuilder<OutputT>
- extends Builders.WindowBy<TriggerByBuilder<OutputT>>,
- OptionalMethodBuilder<WindowByBuilder<OutputT>, Builders.Output<OutputT>>,
- Builders.Output<OutputT> {
+ public interface WindowByBuilder<InputT, KeyT>
+ extends Builders.WindowBy<TriggerByBuilder<InputT>>,
+ OptionalMethodBuilder<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>>,
+ Builders.Output<InputT> {
@Override
- <T extends BoundedWindow> TriggerByBuilder<OutputT> windowBy(WindowFn<Object, T> windowing);
+ <W extends BoundedWindow> TriggerByBuilder<InputT> windowBy(WindowFn<Object, W> windowing);
@Override
- default Builders.Output<OutputT> applyIf(
- boolean cond, UnaryFunction<WindowByBuilder<OutputT>, Builders.Output<OutputT>> fn) {
+ default Builders.Output<InputT> applyIf(
+ boolean cond, UnaryFunction<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>> fn) {
+
return cond ? requireNonNull(fn).apply(this) : this;
}
}
/** Builder for the 'triggeredBy' step. */
- public interface TriggerByBuilder<OutputT>
- extends Builders.TriggeredBy<AccumulationModeBuilder<OutputT>> {
+ public interface TriggerByBuilder<T> extends Builders.TriggeredBy<AccumulationModeBuilder<T>> {
@Override
- AccumulationModeBuilder<OutputT> triggeredBy(Trigger trigger);
+ AccumulationModeBuilder<T> triggeredBy(Trigger trigger);
}
/** Builder for the 'accumulationMode' step. */
- public interface AccumulationModeBuilder<OutputT>
- extends Builders.AccumulationMode<WindowedOutputBuilder<OutputT>> {
+ public interface AccumulationModeBuilder<T>
+ extends Builders.AccumulationMode<WindowedOutputBuilder<T>> {
@Override
- WindowedOutputBuilder<OutputT> accumulationMode(
- WindowingStrategy.AccumulationMode accumulationMode);
+ WindowedOutputBuilder<T> accumulationMode(WindowingStrategy.AccumulationMode accumulationMode);
}
/** Builder for 'windowed output' step. */
- public interface WindowedOutputBuilder<OutputT>
- extends Builders.WindowedOutput<WindowedOutputBuilder<OutputT>>, Builders.Output<OutputT> {}
+ public interface WindowedOutputBuilder<T>
+ extends Builders.WindowedOutput<WindowedOutputBuilder<T>>, Builders.Output<T> {}
- private static class Builder<InputT, OutputT>
+ private static class Builder<InputT, KeyT>
implements OfBuilder,
- MappedBuilder<InputT, OutputT>,
- WindowByBuilder<OutputT>,
- TriggerByBuilder<OutputT>,
- AccumulationModeBuilder<OutputT>,
- WindowedOutputBuilder<OutputT>,
- Builders.Output<OutputT> {
+ ProjectedBuilder<InputT, KeyT>,
+ WindowByBuilder<InputT, KeyT>,
+ TriggerByBuilder<InputT>,
+ AccumulationModeBuilder<InputT>,
+ WindowedOutputBuilder<InputT>,
+ Builders.Output<InputT> {
private final WindowBuilder<InputT> windowBuilder = new WindowBuilder<>();
@Nullable private final String name;
private PCollection<InputT> input;
- @Nullable private UnaryFunction<InputT, OutputT> mapper;
- @Nullable private TypeDescriptor<OutputT> outputType;
+
+ @SuppressWarnings("unchecked")
+ private UnaryFunction<InputT, KeyT> transform = (UnaryFunction) e -> e;
+
+ private SelectionPolicy policy = null;
+
+ @Nullable private TypeDescriptor<KeyT> projectedType;
+ @Nullable private TypeDescriptor<InputT> outputType;
+ private boolean projected = false;
Builder(@Nullable String name) {
this.name = name;
}
@Override
- public <T> MappedBuilder<T, T> of(PCollection<T> input) {
+ public <T> ProjectedBuilder<T, T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
- final Builder<T, T> casted = (Builder) this;
- casted.input = requireNonNull(input);
- return casted;
+ final Builder<T, T> cast = (Builder) this;
+ cast.input = requireNonNull(input);
+ return cast;
}
@Override
- public <T> WindowByBuilder<T> mapped(
- UnaryFunction<InputT, T> mapper, @Nullable TypeDescriptor<T> outputType) {
+ public <K> WindowByBuilder<InputT, K> projected(
+ UnaryFunction<InputT, K> transform,
+ SelectionPolicy policy,
+ @Nullable TypeDescriptor<K> projectedType) {
+
@SuppressWarnings("unchecked")
- final Builder<InputT, T> casted = (Builder) this;
- casted.mapper = requireNonNull(mapper);
- casted.outputType = outputType;
- return casted;
+ final Builder<InputT, K> cast = (Builder) this;
+ cast.transform = requireNonNull(transform);
+ cast.policy = requireNonNull(policy);
+ cast.projectedType = projectedType;
+ cast.projected = true;
+ return cast;
}
@Override
- public <T extends BoundedWindow> TriggerByBuilder<OutputT> windowBy(
- WindowFn<Object, T> windowFn) {
+ public <W extends BoundedWindow> TriggerByBuilder<InputT> windowBy(
+ WindowFn<Object, W> windowFn) {
+
windowBuilder.windowBy(windowFn);
return this;
}
@Override
- public AccumulationModeBuilder<OutputT> triggeredBy(Trigger trigger) {
+ public AccumulationModeBuilder<InputT> triggeredBy(Trigger trigger) {
windowBuilder.triggeredBy(trigger);
return this;
}
@Override
- public WindowedOutputBuilder<OutputT> accumulationMode(
+ public WindowedOutputBuilder<InputT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode) {
windowBuilder.accumulationMode(accumulationMode);
return this;
}
@Override
- public WindowedOutputBuilder<OutputT> withAllowedLateness(Duration allowedLateness) {
+ public WindowedOutputBuilder<InputT> withAllowedLateness(Duration allowedLateness) {
windowBuilder.withAllowedLateness(allowedLateness);
return this;
}
@Override
- public WindowedOutputBuilder<OutputT> withAllowedLateness(
+ public WindowedOutputBuilder<InputT> withAllowedLateness(
Duration allowedLateness, Window.ClosingBehavior closingBehavior) {
windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
return this;
}
@Override
- public WindowedOutputBuilder<OutputT> withTimestampCombiner(
+ public WindowedOutputBuilder<InputT> withTimestampCombiner(
TimestampCombiner timestampCombiner) {
windowBuilder.withTimestampCombiner(timestampCombiner);
return this;
}
@Override
- public WindowedOutputBuilder<OutputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
+ public WindowedOutputBuilder<InputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
windowBuilder.withOnTimeBehavior(behavior);
return this;
}
@Override
@SuppressWarnings("unchecked")
- public PCollection<OutputT> output(OutputHint... outputHints) {
- if (mapper == null) {
- this.mapper = (UnaryFunction) UnaryFunction.identity();
+ public PCollection<InputT> output(OutputHint... outputHints) {
+ if (transform == null) {
+ this.transform = (UnaryFunction) UnaryFunction.identity();
}
- final Distinct<InputT, OutputT> distinct =
- new Distinct<>(name, mapper, outputType, windowBuilder.getWindow().orElse(null));
+ final Distinct<InputT, KeyT> distinct =
+ new Distinct<>(
+ name,
+ transform,
+ outputType,
+ projectedType,
+ windowBuilder.getWindow().orElse(null),
+ policy,
+ projected);
return OperatorTransform.apply(distinct, PCollectionList.of(input));
}
}
+ private final boolean projected;
+ private final @Nullable SelectionPolicy policy;
+
private Distinct(
@Nullable String name,
- UnaryFunction<InputT, OutputT> mapper,
- @Nullable TypeDescriptor<OutputT> outputType,
- @Nullable Window<InputT> window) {
- super(name, outputType, mapper, outputType, window);
+ UnaryFunction<InputT, KeyT> transform,
+ @Nullable TypeDescriptor<InputT> outputType,
+ @Nullable TypeDescriptor<KeyT> projectedType,
+ @Nullable Window<InputT> window,
+ @Nullable SelectionPolicy policy,
+ boolean projected) {
+
+ super(name, outputType, transform, projectedType, window);
+ this.projected = projected;
+ this.policy = policy;
+
+ Preconditions.checkState(
+ !projected || policy != null,
+ "Please specify selection policy when using projected distinct.");
}
@Override
- public PCollection<OutputT> expand(PCollectionList<InputT> inputs) {
- final PCollection<KV<OutputT, Void>> distinct =
- ReduceByKey.named(getName().orElse(null))
- .of(PCollectionLists.getOnlyElement(inputs))
- .keyBy(getKeyExtractor())
- .valueBy(e -> null, TypeDescriptors.nulls())
- .combineBy(e -> null)
- .applyIf(
- getWindow().isPresent(),
- builder -> {
- @SuppressWarnings("unchecked")
- final ReduceByKey.WindowByInternalBuilder<InputT, OutputT, Void> casted =
- (ReduceByKey.WindowByInternalBuilder) builder;
- return casted.windowBy(
- getWindow()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Unable to resolve windowing for Distinct expansion.")));
+ public PCollection<InputT> expand(PCollectionList<InputT> inputs) {
+ PCollection<InputT> tmp = PCollectionLists.getOnlyElement(inputs);
+ PCollection<InputT> input =
+ getWindow()
+ .map(
+ w -> {
+ PCollection<InputT> ret = tmp.apply(w);
+ ret.setTypeDescriptor(tmp.getTypeDescriptor());
+ return ret;
})
- .output();
- return MapElements.named(getName().orElse("") + "::extract-keys")
- .of(distinct)
- .using(KV::getKey, getKeyType().orElse(null))
+ .orElse(tmp);
+ if (!projected) {
+ PCollection<KV<InputT, Void>> distinct =
+ ReduceByKey.named(getName().orElse(null))
+ .of(input)
+ .keyBy(e -> e, input.getTypeDescriptor())
+ .valueBy(e -> null, TypeDescriptors.nulls())
+ .combineBy(e -> null, TypeDescriptors.nulls())
+ .output();
+ return MapElements.named(getName().orElse("") + "::extract-keys")
+ .of(distinct)
+ .using(KV::getKey, input.getTypeDescriptor())
+ .output();
+ }
+ UnaryFunction<PCollection<InputT>, PCollection<InputT>> transformFn = getTransformFn();
+ return transformFn.apply(input);
+ }
+
+ private UnaryFunction<PCollection<InputT>, PCollection<InputT>> getTransformFn() {
+ switch (policy) {
+ case NEWEST:
+ case OLDEST:
+ String name = getName().orElse(null);
+ return input -> input.apply(TimestampExtractTransform.of(name, this::reduceTimestamped));
+ case ANY:
+ return this::reduceSelectingAny;
+ default:
+ throw new IllegalArgumentException("Unknown policy " + policy);
+ }
+ }
+
+ private PCollection<InputT> reduceSelectingAny(PCollection<InputT> input) {
+ return ReduceByKey.named(getName().orElse(null))
+ .of(input)
+ .keyBy(getKeyExtractor(), getKeyType().orElse(null))
+ .valueBy(e -> e, getOutputType().orElse(null))
+ .combineBy(values -> nonEmpty(values.findAny()), getOutputType().orElse(null))
+ .outputValues();
+ }
+
+ private PCollection<InputT> reduceTimestamped(PCollection<KV<Long, InputT>> input) {
+ CombinableReduceFunction<KV<Long, InputT>> select = getReduceFn();
+ PCollection<KV<Long, InputT>> outputValues =
+ ReduceByKey.named(getName().orElse(null))
+ .of(input)
+ .keyBy(e -> getKeyExtractor().apply(e.getValue()), getKeyType().orElse(null))
+ .valueBy(e -> e, requireNonNull(input.getTypeDescriptor()))
+ .combineBy(select, requireNonNull(input.getTypeDescriptor()))
+ .outputValues();
+ return MapElements.named(getName().map(n -> n + "::unwrap").orElse(null))
+ .of(outputValues)
+ .using(KV::getValue, getOutputType().orElse(null))
.output();
}
+
+ private CombinableReduceFunction<KV<Long, InputT>> getReduceFn() {
+ return policy == SelectionPolicy.NEWEST
+ ? values ->
+ nonEmpty(
+ values.collect(Collectors.maxBy((a, b) -> Long.compare(a.getKey(), b.getKey()))))
+ : values ->
+ nonEmpty(
+ values.collect(Collectors.minBy((a, b) -> Long.compare(a.getKey(), b.getKey()))));
+ }
+
+ private static <T> T nonEmpty(Optional<T> in) {
+ return in.orElseThrow(() -> new IllegalStateException("Empty reduce values?"));
+ }
}
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java
index 8538dbb..33172a0 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Filter.java
@@ -109,9 +109,9 @@ public class Filter<InputT> extends Operator<InputT> implements CompositeOperato
@Override
public <T> ByBuilder<T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
- final Builder<T> casted = (Builder) this;
- casted.input = requireNonNull(input);
- return casted;
+ final Builder<T> cast = (Builder) this;
+ cast.input = requireNonNull(input);
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java
index b0d232e..dd3d675 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FlatMap.java
@@ -176,9 +176,9 @@ public class FlatMap<InputT, OutputT> extends Operator<OutputT>
@Override
public <InputLocalT> UsingBuilder<InputLocalT> of(PCollection<InputLocalT> input) {
@SuppressWarnings("unchecked")
- Builder<InputLocalT, ?> casted = (Builder) this;
- casted.input = requireNonNull(input);
- return casted;
+ Builder<InputLocalT, ?> cast = (Builder) this;
+ cast.input = requireNonNull(input);
+ return cast;
}
@Override
@@ -191,10 +191,10 @@ public class FlatMap<InputT, OutputT> extends Operator<OutputT>
public <OutputLocalT> EventTimeBuilder<InputT, OutputLocalT> using(
UnaryFunctor<InputT, OutputLocalT> functor, TypeDescriptor<OutputLocalT> outputType) {
@SuppressWarnings("unchecked")
- Builder<InputT, OutputLocalT> casted = (Builder) this;
- casted.functor = requireNonNull(functor);
- casted.outputType = outputType;
- return casted;
+ Builder<InputT, OutputLocalT> cast = (Builder) this;
+ cast.functor = requireNonNull(functor);
+ cast.outputType = outputType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java
index 5b1c661..63a3755 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/FullJoin.java
@@ -132,10 +132,10 @@ public class FullJoin {
public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
PCollection<FirstT> left, PCollection<SecondT> right) {
@SuppressWarnings("unchecked")
- final Builder<FirstT, SecondT, ?> casted = (Builder) this;
- casted.left = requireNonNull(left);
- casted.right = requireNonNull(right);
- return casted;
+ final Builder<FirstT, SecondT, ?> cast = (Builder) this;
+ cast.left = requireNonNull(left);
+ cast.right = requireNonNull(right);
+ return cast;
}
@Override
@@ -144,11 +144,11 @@ public class FullJoin {
UnaryFunction<RightT, T> rightKeyExtractor,
@Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<LeftT, RightT, T> casted = (Builder) this;
- casted.leftKeyExtractor = requireNonNull(leftKeyExtractor);
- casted.rightKeyExtractor = requireNonNull(rightKeyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<LeftT, RightT, T> cast = (Builder) this;
+ cast.leftKeyExtractor = requireNonNull(leftKeyExtractor);
+ cast.rightKeyExtractor = requireNonNull(rightKeyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
index e90d244..97563a7 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
@@ -202,10 +202,10 @@ public class Join<LeftT, RightT, KeyT, OutputT>
public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
PCollection<FirstT> left, PCollection<SecondT> right) {
@SuppressWarnings("unchecked")
- final Builder<FirstT, SecondT, ?, ?> casted = (Builder) this;
- casted.left = requireNonNull(left);
- casted.right = requireNonNull(right);
- return casted;
+ final Builder<FirstT, SecondT, ?, ?> cast = (Builder) this;
+ cast.left = requireNonNull(left);
+ cast.right = requireNonNull(right);
+ return cast;
}
@Override
@@ -214,21 +214,21 @@ public class Join<LeftT, RightT, KeyT, OutputT>
UnaryFunction<RightT, T> rightKeyExtractor,
@Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<LeftT, RightT, T, ?> casted = (Builder) this;
- casted.leftKeyExtractor = leftKeyExtractor;
- casted.rightKeyExtractor = rightKeyExtractor;
- casted.keyType = keyType;
- return casted;
+ final Builder<LeftT, RightT, T, ?> cast = (Builder) this;
+ cast.leftKeyExtractor = leftKeyExtractor;
+ cast.rightKeyExtractor = rightKeyExtractor;
+ cast.keyType = keyType;
+ return cast;
}
@Override
public <T> WindowByBuilder<KeyT, T> using(
BinaryFunctor<LeftT, RightT, T> joinFunc, @Nullable TypeDescriptor<T> outputType) {
@SuppressWarnings("unchecked")
- final Builder<LeftT, RightT, KeyT, T> casted = (Builder) this;
- casted.joinFunc = requireNonNull(joinFunc);
- casted.outputType = outputType;
- return casted;
+ final Builder<LeftT, RightT, KeyT, T> cast = (Builder) this;
+ cast.joinFunc = requireNonNull(joinFunc);
+ cast.outputType = outputType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java
index c21a14b..fd620ab 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/LeftJoin.java
@@ -131,10 +131,10 @@ public class LeftJoin {
public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
PCollection<FirstT> left, PCollection<SecondT> right) {
@SuppressWarnings("unchecked")
- final Builder<FirstT, SecondT, ?> casted = (Builder) this;
- casted.left = requireNonNull(left);
- casted.right = requireNonNull(right);
- return casted;
+ final Builder<FirstT, SecondT, ?> cast = (Builder) this;
+ cast.left = requireNonNull(left);
+ cast.right = requireNonNull(right);
+ return cast;
}
@Override
@@ -143,11 +143,11 @@ public class LeftJoin {
UnaryFunction<RightT, T> rightKeyExtractor,
@Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<LeftT, RightT, T> casted = (Builder) this;
- casted.leftKeyExtractor = requireNonNull(leftKeyExtractor);
- casted.rightKeyExtractor = requireNonNull(rightKeyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<LeftT, RightT, T> cast = (Builder) this;
+ cast.leftKeyExtractor = requireNonNull(leftKeyExtractor);
+ cast.rightKeyExtractor = requireNonNull(rightKeyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java
index 29b393a..522cd73 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/MapElements.java
@@ -134,19 +134,19 @@ public class MapElements<InputT, OutputT> extends Operator<OutputT>
@Override
public <T> UsingBuilder<T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
- final Builder<T, ?> casted = (Builder) this;
- casted.input = input;
- return casted;
+ final Builder<T, ?> cast = (Builder) this;
+ cast.input = input;
+ return cast;
}
@Override
public <T> Builders.Output<T> using(
UnaryFunctionEnv<InputT, T> mapper, @Nullable TypeDescriptor<T> outputType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, T> casted = (Builder) this;
- casted.mapper = mapper;
- casted.outputType = outputType;
- return casted;
+ final Builder<InputT, T> cast = (Builder) this;
+ cast.mapper = mapper;
+ cast.outputType = outputType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
index 6b1eb42..67769c9 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
@@ -335,29 +335,29 @@ public class ReduceByKey<InputT, KeyT, ValueT, OutputT>
@SuppressWarnings("unchecked")
public <T> KeyByBuilder<T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
- final Builder<T, ?, ?, ?> casted = (Builder) this;
- casted.input = input;
- return casted;
+ final Builder<T, ?, ?, ?> cast = (Builder) this;
+ cast.input = input;
+ return cast;
}
@Override
public <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(
UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, T, InputT, ?> casted = (Builder) this;
- casted.keyExtractor = requireNonNull(keyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<InputT, T, InputT, ?> cast = (Builder) this;
+ cast.keyExtractor = requireNonNull(keyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
public <T> ReduceByBuilder<KeyT, T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, KeyT, T, ?> casted = (Builder) this;
- casted.valueExtractor = requireNonNull(valueExtractor);
- casted.valueType = valueType;
- return casted;
+ final Builder<InputT, KeyT, T, ?> cast = (Builder) this;
+ cast.valueExtractor = requireNonNull(valueExtractor);
+ cast.valueType = valueType;
+ return cast;
}
@Override
@@ -369,10 +369,10 @@ public class ReduceByKey<InputT, KeyT, ValueT, OutputT>
valueExtractor = (UnaryFunction) UnaryFunction.identity();
}
@SuppressWarnings("unchecked")
- final Builder<InputT, KeyT, ValueT, T> casted = (Builder) this;
- casted.reducer = requireNonNull(reducer);
- casted.outputType = outputType;
- return casted;
+ final Builder<InputT, KeyT, ValueT, T> cast = (Builder) this;
+ cast.reducer = requireNonNull(reducer);
+ cast.outputType = outputType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java
index dd0eb30..4c26518 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceWindow.java
@@ -276,19 +276,19 @@ public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<Input
@Override
public <T> ValueByReduceByBuilder<T, T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
- final Builder<T, T, ?> casted = (Builder) this;
- casted.input = requireNonNull(input);
- return casted;
+ final Builder<T, T, ?> cast = (Builder) this;
+ cast.input = requireNonNull(input);
+ return cast;
}
@Override
public <T> ReduceByBuilder<T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, T, ?> casted = (Builder) this;
- casted.valueExtractor = requireNonNull(valueExtractor);
- casted.valueType = valueType;
- return casted;
+ final Builder<InputT, T, ?> cast = (Builder) this;
+ cast.valueExtractor = requireNonNull(valueExtractor);
+ cast.valueType = valueType;
+ return cast;
}
@Override
@@ -300,10 +300,10 @@ public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<Input
valueExtractor = (UnaryFunction) UnaryFunction.identity();
}
@SuppressWarnings("unchecked")
- final Builder<InputT, ValueT, T> casted = (Builder) this;
- casted.reducer = requireNonNull(reducer);
- casted.outputType = outputType;
- return casted;
+ final Builder<InputT, ValueT, T> cast = (Builder) this;
+ cast.reducer = requireNonNull(reducer);
+ cast.outputType = outputType;
+ return cast;
}
@Override
@@ -439,9 +439,9 @@ public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<Input
getWindow().isPresent(),
builder -> {
@SuppressWarnings("unchecked")
- final ReduceByKey.WindowByInternalBuilder<InputT, Byte, OutputT> casted =
+ final ReduceByKey.WindowByInternalBuilder<InputT, Byte, OutputT> cast =
(ReduceByKey.WindowByInternalBuilder) builder;
- return casted.windowBy(
+ return cast.windowBy(
getWindow()
.orElseThrow(
() ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java
index 3a3c0b4..55c292e 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/RightJoin.java
@@ -132,10 +132,10 @@ public class RightJoin {
public <FirstT, SecondT> ByBuilder<FirstT, SecondT> of(
PCollection<FirstT> left, PCollection<SecondT> right) {
@SuppressWarnings("unchecked")
- final Builder<FirstT, SecondT, ?> casted = (Builder) this;
- casted.left = requireNonNull(left);
- casted.right = requireNonNull(right);
- return casted;
+ final Builder<FirstT, SecondT, ?> cast = (Builder) this;
+ cast.left = requireNonNull(left);
+ cast.right = requireNonNull(right);
+ return cast;
}
@Override
@@ -144,11 +144,11 @@ public class RightJoin {
UnaryFunction<RightT, T> rightKeyExtractor,
@Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<LeftT, RightT, T> casted = (Builder) this;
- casted.leftKeyExtractor = requireNonNull(leftKeyExtractor);
- casted.rightKeyExtractor = requireNonNull(rightKeyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<LeftT, RightT, T> cast = (Builder) this;
+ cast.leftKeyExtractor = requireNonNull(leftKeyExtractor);
+ cast.rightKeyExtractor = requireNonNull(rightKeyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java
index 203fcc5..8f89f35 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/SumByKey.java
@@ -206,10 +206,10 @@ public class SumByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<Key
public <T> ValueByBuilder<InputT, T> keyBy(
UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, T> casted = (Builder<InputT, T>) this;
- casted.keyExtractor = requireNonNull(keyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<InputT, T> cast = (Builder<InputT, T>) this;
+ cast.keyExtractor = requireNonNull(keyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
@@ -309,9 +309,9 @@ public class SumByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<Key
getWindow().isPresent(),
builder -> {
@SuppressWarnings("unchecked")
- final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> casted =
+ final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> cast =
(ReduceByKey.WindowByInternalBuilder) builder;
- return casted.windowBy(
+ return cast.windowBy(
getWindow()
.orElseThrow(
() ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java
index e69f5d7..7ca3ce9 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/TopPerKey.java
@@ -242,30 +242,30 @@ public class TopPerKey<InputT, KeyT, ValueT, ScoreT extends Comparable<ScoreT>>
public <T> ValueByBuilder<InputT, T> keyBy(
UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, T, ?, ?> casted = (Builder) this;
- casted.keyExtractor = requireNonNull(keyExtractor);
- casted.keyType = keyType;
- return casted;
+ final Builder<InputT, T, ?, ?> cast = (Builder) this;
+ cast.keyExtractor = requireNonNull(keyExtractor);
+ cast.keyType = keyType;
+ return cast;
}
@Override
public <T> ScoreBy<InputT, KeyT, T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, KeyT, T, ?> casted = (Builder) this;
- casted.valueExtractor = requireNonNull(valueExtractor);
- casted.valueType = valueType;
- return casted;
+ final Builder<InputT, KeyT, T, ?> cast = (Builder) this;
+ cast.valueExtractor = requireNonNull(valueExtractor);
+ cast.valueType = valueType;
+ return cast;
}
@Override
public <T extends Comparable<T>> WindowByBuilder<KeyT, ValueT, T> scoreBy(
UnaryFunction<InputT, T> scoreExtractor, @Nullable TypeDescriptor<T> scoreType) {
@SuppressWarnings("unchecked")
- final Builder<InputT, KeyT, ValueT, T> casted = (Builder) this;
- casted.scoreExtractor = requireNonNull(scoreExtractor);
- casted.scoreType = scoreType;
- return casted;
+ final Builder<InputT, KeyT, ValueT, T> cast = (Builder) this;
+ cast.scoreExtractor = requireNonNull(scoreExtractor);
+ cast.scoreType = scoreType;
+ return cast;
}
@Override
@@ -399,8 +399,8 @@ public class TopPerKey<InputT, KeyT, ValueT, ScoreT extends Comparable<ScoreT>>
builder -> {
@SuppressWarnings("unchecked")
final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Triple<KeyT, ValueT, ScoreT>>
- casted = (ReduceByKey.WindowByInternalBuilder) builder;
- return casted.windowBy(
+ cast = (ReduceByKey.WindowByInternalBuilder) builder;
+ return cast.windowBy(
getWindow()
.orElseThrow(
() ->
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java
index dc186f5..9a48bca 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Union.java
@@ -143,9 +143,9 @@ public class Union<InputT> extends Operator<InputT> {
@Override
public <T> Builders.Output<T> of(List<PCollection<T>> pCollections) {
@SuppressWarnings("unchecked")
- final Builder<T> casted = (Builder) this;
- casted.pCollections = pCollections;
- return casted;
+ final Builder<T> cast = (Builder) this;
+ cast.pCollections = pCollections;
+ return cast;
}
@Override
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java
index d8896de..99b5f07 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/ShuffleOperator.java
@@ -40,6 +40,7 @@ public abstract class ShuffleOperator<InputT, KeyT, OutputT> extends Operator<Ou
UnaryFunction<InputT, KeyT> keyExtractor,
@Nullable TypeDescriptor<KeyT> keyType,
@Nullable Window<InputT> windowing) {
+
super(name, outputType);
this.keyExtractor = keyExtractor;
this.keyType = keyType;
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java
index c6ac8bf..501d22b 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslator.java
@@ -44,8 +44,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Table;
* to follow to avoid data to be send to executors repeatedly:
*
* <ul>
- * <li>Input {@link PCollection} of broadcasted side has to be the same instance
- * <li>Key extractor of broadcasted side has to be the same {@link UnaryFunction} instance
+ * <li>Input {@link PCollection} of broadcast side has to be the same instance
+ * <li>Key extractor of broadcast side has to be the same {@link UnaryFunction} instance
* </ul>
*/
public class BroadcastHashJoinTranslator<LeftT, RightT, KeyT, OutputT>
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java
index 675cfac..54d07fa 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java
@@ -56,7 +56,7 @@ public class OperatorTransform<InputT, OutputT, OperatorT extends Operator<Outpu
+ operator.getClass()
+ "] with name ["
+ operator.getName().orElse(null)
- + ".");
+ + "]");
}
private final OperatorT operator;
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java
index 930ed14..64d5d78 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java
@@ -86,8 +86,8 @@ public class ReduceByKeyTranslator<InputT, KeyT, ValueT, OutputT>
"combine",
Combine.perKey(asCombiner(reducer, accumulators, operator.getName().orElse(null))));
@SuppressWarnings("unchecked")
- final PCollection<KV<KeyT, OutputT>> casted = (PCollection) combined;
- return casted.setTypeDescriptor(
+ final PCollection<KV<KeyT, OutputT>> cast = (PCollection) combined;
+ return cast.setTypeDescriptor(
operator
.getOutputType()
.orElseThrow(
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
new file mode 100644
index 0000000..bc82ecb
--- /dev/null
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/**
+ * A transform for extracting stamp and applying a user-defined transform on the extracted
+ * collection.
+ *
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+public class TimestampExtractTransform<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+
+ /**
+ * Create transformation working on KVs with timestamps as key.
+ *
+ * @param <InputT> type of input elements
+ * @param <OutputT> type of output elements
+ * @param transform the transform applied on elements with timestamp
+ * @return the transform
+ */
+ public static <InputT, OutputT> TimestampExtractTransform<InputT, OutputT> of(
+ PCollectionTransform<InputT, OutputT> transform) {
+
+ return new TimestampExtractTransform<>(null, transform);
+ }
+
+ /**
+ * Create transformation working on KVs with timestamps as key.
+ *
+ * @param <InputT> type of input elements
+ * @param <OutputT> type of output elements
+ * @param name name of the transform
+ * @param transform the transform applied on elements with timestamp
+ * @return the transform
+ */
+ public static <InputT, OutputT> TimestampExtractTransform<InputT, OutputT> of(
+ String name, PCollectionTransform<InputT, OutputT> transform) {
+
+ return new TimestampExtractTransform<>(name, transform);
+ }
+
+ /**
+ * Apply user defined transform(s) to input {@link PCollection} and return output one.
+ *
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+ @FunctionalInterface
+ public interface PCollectionTransform<InputT, OutputT>
+ extends UnaryFunction<PCollection<KV<Long, InputT>>, PCollection<OutputT>> {}
+
+ private static class Wrap<T> extends DoFn<T, KV<Long, T>> {
+
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(KV.of(ctx.timestamp().getMillis(), ctx.element()));
+ }
+ }
+
+ private static class Unwrap<T> extends DoFn<KV<Long, T>, T> {
+
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(ctx.element().getValue());
+ }
+ }
+
+ private final PCollectionTransform<InputT, OutputT> timestampedTransform;
+
+ private TimestampExtractTransform(
+ @Nullable String name, PCollectionTransform<InputT, OutputT> timestampedTransform) {
+
+ super(name);
+ this.timestampedTransform = timestampedTransform;
+ }
+
+ @Override
+ public PCollection<OutputT> expand(PCollection<InputT> input) {
+ PCollection<KV<Long, InputT>> in;
+ in = input.apply(getName("wrap"), ParDo.of(new Wrap<>()));
+ if (input.getTypeDescriptor() != null) {
+ in =
+ in.setTypeDescriptor(
+ TypeDescriptors.kvs(TypeDescriptors.longs(), input.getTypeDescriptor()));
+ }
+ return timestampedTransform.apply(in);
+ }
+
+ private String getName(String suffix) {
+ return name != null ? name + "::" + suffix : suffix;
+ }
+}
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
index 210f4e2..f9ea163 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
@@ -164,12 +164,12 @@ public class GenericTranslatorProvider implements TranslatorProvider {
"At least user defined predicate or class of an operator have to be given.");
@SuppressWarnings("unchecked")
- OperatorTranslator<?, ?, OperatorT> castedTranslator =
+ OperatorTranslator<?, ?, OperatorT> castTranslator =
(OperatorTranslator<?, ?, OperatorT>) translator;
this.operatorClass = operatorClass;
this.userDefinedPredicate = userDefinedPredicate;
- this.translator = castedTranslator;
+ this.translator = castTranslator;
}
static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
index 4b2eb8c..b078572 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
@@ -281,9 +281,11 @@ public class DocumentationExamplesTest {
KV.of(1, 100L), KV.of(3, 100_000L), KV.of(42, 10L), KV.of(1, 0L), KV.of(3, 0L)));
// suppose input: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
- PCollection<Integer> uniqueKeys =
- Distinct.named("unique-keys-only").of(keyValueInput).mapped(KV::getKey).output();
+ PCollection<KV<Integer, Long>> distinct =
+ Distinct.named("unique-keys-only").of(keyValueInput).projected(KV::getKey).output();
+
// Output will contain: 1, 3, 42
+ PCollection<Integer> uniqueKeys = MapElements.of(distinct).using(KV::getKey).output();
PAssert.that(uniqueKeys).containsInAnyOrder(1, 3, 42);
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java
index e489d37..a81bc77 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/DistinctTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.values.KV;
@@ -75,12 +76,14 @@ public class DistinctTest extends AbstractOperatorTest {
@Override
protected PCollection<Integer> getOutput(PCollection<KV<Integer, Long>> input) {
input = AssignEventTime.of(input).using(KV::getValue).output();
- return Distinct.of(input)
- .mapped(KV::getKey)
- .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
- .triggeredBy(DefaultTrigger.of())
- .discardingFiredPanes()
- .output();
+ PCollection<KV<Integer, Long>> distinct =
+ Distinct.of(input)
+ .projected(KV::getKey)
+ .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+ .triggeredBy(DefaultTrigger.of())
+ .discardingFiredPanes()
+ .output();
+ return MapElements.of(distinct).using(KV::getKey).output();
}
@Override
@@ -114,12 +117,14 @@ public class DistinctTest extends AbstractOperatorTest {
@Override
protected PCollection<Integer> getOutput(PCollection<KV<Integer, Long>> input) {
input = AssignEventTime.of(input).using(KV::getValue).output();
- return Distinct.of(input)
- .mapped(KV::getKey)
- .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
- .triggeredBy(DefaultTrigger.of())
- .discardingFiredPanes()
- .output();
+ PCollection<KV<Integer, Long>> distinct =
+ Distinct.of(input)
+ .projected(KV::getKey)
+ .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+ .triggeredBy(DefaultTrigger.of())
+ .discardingFiredPanes()
+ .output();
+ return MapElements.of(distinct).using(KV::getKey).output();
}
@Override
@@ -136,10 +141,87 @@ public class DistinctTest extends AbstractOperatorTest {
});
}
- private List<KV<Integer, Long>> asTimedList(long step, Integer... values) {
- final List<KV<Integer, Long>> ret = new ArrayList<>(values.length);
+ @Test
+ public void testSimpleDuplicatesWithStreamStrategyOldest() {
+ execute(
+ new AbstractTestCase<KV<String, Long>, String>() {
+
+ @Override
+ public List<String> getUnorderedOutput() {
+ return Arrays.asList("2", "1", "3");
+ }
+
+ @Override
+ protected PCollection<String> getOutput(PCollection<KV<String, Long>> input) {
+ input = AssignEventTime.of(input).using(KV::getValue).output();
+ PCollection<KV<String, Long>> distinct =
+ Distinct.of(input)
+ .projected(
+ in -> in.getKey().substring(0, 1),
+ Distinct.SelectionPolicy.OLDEST,
+ TypeDescriptors.strings())
+ .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+ .triggeredBy(DefaultTrigger.of())
+ .discardingFiredPanes()
+ .output();
+ return MapElements.of(distinct).using(KV::getKey).output();
+ }
+
+ @Override
+ protected List<KV<String, Long>> getInput() {
+ return asTimedList(100, "1", "2", "3", "3.", "2.", "1.");
+ }
+
+ @Override
+ protected TypeDescriptor<KV<String, Long>> getInputType() {
+ return TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs());
+ }
+ });
+ }
+
+ @Test
+ public void testSimpleDuplicatesWithStreamStrategyNewest() {
+ execute(
+ new AbstractTestCase<KV<String, Long>, String>() {
+
+ @Override
+ public List<String> getUnorderedOutput() {
+ return Arrays.asList("2.", "1.", "3.");
+ }
+
+ @Override
+ protected PCollection<String> getOutput(PCollection<KV<String, Long>> input) {
+ input = AssignEventTime.of(input).using(KV::getValue).output();
+ PCollection<KV<String, Long>> distinct =
+ Distinct.of(input)
+ .projected(
+ in -> in.getKey().substring(0, 1),
+ Distinct.SelectionPolicy.NEWEST,
+ TypeDescriptors.strings())
+ .windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
+ .triggeredBy(DefaultTrigger.of())
+ .discardingFiredPanes()
+ .output();
+ return MapElements.of(distinct).using(KV::getKey).output();
+ }
+
+ @Override
+ protected List<KV<String, Long>> getInput() {
+ return asTimedList(100, "1", "2", "3", "3.", "2.", "1.");
+ }
+
+ @Override
+ protected TypeDescriptor<KV<String, Long>> getInputType() {
+ return TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs());
+ }
+ });
+ }
+
+ @SafeVarargs
+ final <T> List<KV<T, Long>> asTimedList(long step, T... values) {
+ final List<KV<T, Long>> ret = new ArrayList<>(values.length);
long i = step;
- for (Integer v : values) {
+ for (T v : values) {
ret.add(KV.of(v, i));
i += step;
}
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransformTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransformTest.java
new file mode 100644
index 0000000..2d37ae3
--- /dev/null
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransformTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+
+/** Test suite for {@link TimetampExtractTransform}. */
+public class TimestampExtractTransformTest {
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 10000)
+ public void testTransform() {
+ Pipeline p = Pipeline.create();
+ PCollection<Integer> input = p.apply(Create.of(1, 2, 3));
+ PCollection<KV<Integer, Long>> result =
+ input.apply(
+ TimestampExtractTransform.of(
+ in -> CountByKey.of(in).keyBy(KV::getValue, TypeDescriptors.integers()).output()));
+ PAssert.that(result).containsInAnyOrder(KV.of(1, 1L), KV.of(2, 1L), KV.of(3, 1L));
+ p.run().waitUntilFinish();
+ }
+}
diff --git a/website/src/documentation/sdks/euphoria.md b/website/src/documentation/sdks/euphoria.md
index 45d09ea..652708b 100644
--- a/website/src/documentation/sdks/euphoria.md
+++ b/website/src/documentation/sdks/euphoria.md
@@ -266,9 +266,9 @@ Distinct.named("unique-integers-only")
// suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
Distinct.named("unique-keys-only")
.of(keyValueInput)
- .mapped(KV::getKey)
+ .projected(KV::getKey)
.output();
-// Output will contain: 1, 3, 42
+// Output will contain kvs with keys: 1, 3, 42 with some arbitrary values associated with given keys
```
### `Join`