You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:57 UTC
[8/8] flink git commit: [FLINK-2550] Add Window Aggregations to new
Windowing API
[FLINK-2550] Add Window Aggregations to new Windowing API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a38bb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a38bb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a38bb7
Branch: refs/heads/master
Commit: 28a38bb7dedbc10ceab9d4ae1dbcc15789e33211
Parents: 0bac272
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 17:37:39 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 205 +++++++++++++++++++
.../api/datastream/WindowedStream.java | 205 +++++++++++++++++++
.../aggregation/ComparableAggregator.java | 11 +-
.../functions/aggregation/SumAggregator.java | 2 +-
.../windowing/util/SessionWindowingData.java | 2 +-
.../streaming/api/scala/AllWindowedStream.scala | 96 +++++++++
.../streaming/api/scala/WindowedStream.scala | 96 +++++++++
.../StreamingScalaAPICompletenessTest.scala | 3 +
8 files changed, 612 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0cc1854..89c4857 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -222,6 +225,204 @@ public class AllWindowedStream<T, W extends Window> {
}
// ------------------------------------------------------------------------
+ // Aggregations on the windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies an aggregation that sums every window of the data stream at the
+ * given position.
+ *
+ * @param positionToSum The position in the tuple/array to sum
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> sum(int positionToSum) {
+ return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that sums every window of the pojo data stream at
+ * the given field for every window.
+ *
+ * <p>
+ * A field expression is either
+ * the name of a public field or a getter method with parentheses of the
+ * stream's underlying type. A dot can be used to drill down into objects,
+ * as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field to sum
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> sum(String field) {
+ return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum value of every window
+ * of the data stream at the given position.
+ *
+ * @param positionToMin The position to minimize
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum value of the pojo data
+ * stream at the given field expression for every window.
+ *
+ * <p>
+ * A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> min(String field) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns the first element by default.
+ *
+ * @param positionToMinBy
+ * The position to minimize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(int positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns the first element by default.
+ *
+ * @param positionToMinBy The position to minimize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(String positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns either the first or last one depending
+ * on the parameter setting.
+ *
+ * @param positionToMinBy The position to minimize
+ * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the pojo
+ * data stream by the given field expression for every window. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @param first If True then in case of field equality the first object will be returned
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum value of every window of
+ * the data stream at the given position.
+ *
+ * @param positionToMax The position to maximize
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum value of the pojo data
+ * stream at the given field expression for every window. A field expression
+ * is either the name of a public field or a getter method with parentheses
+ * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
+ * down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> max(String field) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position to maximize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(int positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position to maximize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(String positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns either the first or last one depending
+ * on the parameter setting.
+ *
+ * @param positionToMaxBy The position to maximize by
+ * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the pojo
+ * data stream by the given field expression for every window. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @param first If True then in case of field equality the first object will be returned
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+ }
+
+ private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+ return reduce(aggregator);
+ }
+
+ // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -238,4 +439,8 @@ public class AllWindowedStream<T, W extends Window> {
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
+
+ public TypeInformation<T> getInputType() {
+ return input.getType();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0ea9cad..1273b42 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -26,6 +26,9 @@ import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -241,6 +244,204 @@ public class WindowedStream<T, K, W extends Window> {
}
// ------------------------------------------------------------------------
+ // Aggregations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies an aggregation that sums every window of the data stream at the
+ * given position.
+ *
+ * @param positionToSum The position in the tuple/array to sum
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> sum(int positionToSum) {
+ return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that sums every window of the pojo data stream at
+ * the given field for every window.
+ *
+ * <p>
+ * A field expression is either
+ * the name of a public field or a getter method with parentheses of the
+ * stream's underlying type. A dot can be used to drill down into objects,
+ * as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field to sum
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> sum(String field) {
+ return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum value of every window
+ * of the data stream at the given position.
+ *
+ * @param positionToMin The position to minimize
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum value of the pojo data
+ * stream at the given field expression for every window.
+ *
+ * <p>
+ * A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> min(String field) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns the first element by default.
+ *
+ * @param positionToMinBy
+ * The position to minimize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(int positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns the first element by default.
+ *
+ * @param positionToMinBy The position to minimize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(String positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns either the first or last one depending
+ * on the parameter setting.
+ *
+ * @param positionToMinBy The position to minimize
+ * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the pojo
+ * data stream by the given field expression for every window. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @param first If True then in case of field equality the first object will be returned
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> minBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum value of every window of
+ * the data stream at the given position.
+ *
+ * @param positionToMax The position to maximize
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum value of the pojo data
+ * stream at the given field expression for every window. A field expression
+ * is either the name of a public field or a getter method with parentheses
+ * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
+ * down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> max(String field) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position to maximize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(int positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position to maximize by
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(String positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns either the first or last one depending
+ * on the parameter setting.
+ *
+ * @param positionToMaxBy The position to maximize by
+ * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the pojo
+ * data stream by the given field expression for every window. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @param first If True then in case of field equality the first object will be returned
+ * @return The transformed DataStream.
+ */
+ public DataStream<T> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+ }
+
+ private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+ return reduce(aggregator);
+ }
+
+ // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -307,4 +508,8 @@ public class WindowedStream<T, K, W extends Window> {
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
+
+ public TypeInformation<T> getInputType() {
+ return input.getType();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index 766a59e..e5501a0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -33,8 +33,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
super(pos);
this.comparator = Comparator.getForAggregation(aggregationType);
- this.byAggregate = (aggregationType == AggregationType.MAXBY)
- || (aggregationType == AggregationType.MINBY);
+ this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
this.first = first;
}
@@ -61,7 +60,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
@SuppressWarnings("unchecked")
@Override
public T reduce(T value1, T value2) throws Exception {
- Comparable<Object> o1 = (Comparable<Object>)fieldAccessor.get(value1);
+ Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
Object o2 = fieldAccessor.get(value2);
int c = comparator.isExtremal(o1, o2);
@@ -79,10 +78,10 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
return value2;
} else {
- if (c == 1) {
- value2 = fieldAccessor.set(value2, o1);
+ if (c == 0) {
+ value1 = fieldAccessor.set(value1, o2);
}
- return value2;
+ return value1;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index c23695e..b045233 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -43,6 +43,6 @@ public class SumAggregator<T> extends AggregationFunction<T> {
@SuppressWarnings("unchecked")
@Override
public T reduce(T value1, T value2) throws Exception {
- return fieldAccessor.set(value2, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+ return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
index bb4a123..c1a99a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.examples.windowing.util;
public class SessionWindowingData {
- public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,5,3)\n" +
+ public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,1,3)\n" +
"(a,10,1)";
private SessionWindowingData() {
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 9054d95..d2d0a1d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -134,6 +136,96 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
javaStream.apply(clean(function), implicitly[TypeInformation[R]])
}
+ // ------------------------------------------------------------------------
+ // Aggregations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies an aggregation that that gives the maximum of the elements in the window at
+ * the given position.
+ */
+ def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+ /**
+ * Applies an aggregation that that gives the maximum of the elements in the window at
+ * the given field.
+ */
+ def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+ /**
+ * Applies an aggregation that that gives the minimum of the elements in the window at
+ * the given position.
+ */
+ def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+ /**
+ * Applies an aggregation that that gives the minimum of the elements in the window at
+ * the given field.
+ */
+ def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+ /**
+ * Applies an aggregation that sums the elements in the window at the given position.
+ */
+ def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+ /**
+ * Applies an aggregation that sums the elements in the window at the given field.
+ */
+ def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the window by
+ * the given position. When equality, returns the first.
+ */
+ def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+ position)
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the window by
+ * the given field. When equality, returns the first.
+ */
+ def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+ field)
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the window by
+ * the given position. When equality, returns the first.
+ */
+ def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+ position)
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the window by
+ * the given field. When equality, returns the first.
+ */
+ def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+ field)
+
+ private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+ val position = fieldNames2Indices(getInputType(), Array(field))(0)
+ aggregate(aggregationType, position)
+ }
+
+ def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
+
+ val jStream = javaStream.asInstanceOf[JavaAllWStream[Product, W]]
+
+ val reducer = aggregationType match {
+ case AggregationType.SUM =>
+ new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
+
+ case _ =>
+ new ComparableAggregator(
+ position,
+ jStream.getInputType,
+ aggregationType,
+ true,
+ jStream.getExecutionEnvironment.getConfig)
+ }
+
+ new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+ }
// ------------------------------------------------------------------------
// Utilities
@@ -147,4 +239,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
}
+ /**
+ * Gets the output type.
+ */
+ private def getInputType(): TypeInformation[T] = javaStream.getInputType
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 2d6806d..3963765 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -137,6 +139,96 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
javaStream.apply(clean(function), implicitly[TypeInformation[R]])
}
+ // ------------------------------------------------------------------------
+ // Aggregations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies an aggregation that that gives the maximum of the elements in the window at
+ * the given position.
+ */
+ def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+ /**
+ * Applies an aggregation that that gives the maximum of the elements in the window at
+ * the given field.
+ */
+ def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+ /**
+ * Applies an aggregation that that gives the minimum of the elements in the window at
+ * the given position.
+ */
+ def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+ /**
+ * Applies an aggregation that that gives the minimum of the elements in the window at
+ * the given field.
+ */
+ def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+ /**
+ * Applies an aggregation that sums the elements in the window at the given position.
+ */
+ def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+ /**
+ * Applies an aggregation that sums the elements in the window at the given field.
+ */
+ def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the window by
+ * the given position. When equality, returns the first.
+ */
+ def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+ position)
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the window by
+ * the given field. When equality, returns the first.
+ */
+ def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+ field)
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the window by
+ * the given position. When equality, returns the first.
+ */
+ def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+ position)
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the window by
+ * the given field. When equality, returns the first.
+ */
+ def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+ field)
+
+ private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+ val position = fieldNames2Indices(getInputType(), Array(field))(0)
+ aggregate(aggregationType, position)
+ }
+
+ def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
+
+ val jStream = javaStream.asInstanceOf[JavaWStream[Product, K, W]]
+
+ val reducer = aggregationType match {
+ case AggregationType.SUM =>
+ new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
+
+ case _ =>
+ new ComparableAggregator(
+ position,
+ jStream.getInputType,
+ aggregationType,
+ true,
+ jStream.getExecutionEnvironment.getConfig)
+ }
+
+ new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+ }
// ------------------------------------------------------------------------
// Utilities
@@ -150,4 +242,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
}
+ /**
+ * Gets the output type.
+ */
+ private def getInputType(): TypeInformation[T] = javaStream.getInputType
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index c6bd87a..53aa1e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -51,11 +51,14 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
+
"org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
"org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
"org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
+ "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
"org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
+ "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
"org.apache.flink.streaming.api.datastream.KeyedStream.transform",
"org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",