You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:57 UTC

[8/8] flink git commit: [FLINK-2550] Add Window Aggregations to new Windowing API

[FLINK-2550] Add Window Aggregations to new Windowing API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a38bb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a38bb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a38bb7

Branch: refs/heads/master
Commit: 28a38bb7dedbc10ceab9d4ae1dbcc15789e33211
Parents: 0bac272
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 17:37:39 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 205 +++++++++++++++++++
 .../api/datastream/WindowedStream.java          | 205 +++++++++++++++++++
 .../aggregation/ComparableAggregator.java       |  11 +-
 .../functions/aggregation/SumAggregator.java    |   2 +-
 .../windowing/util/SessionWindowingData.java    |   2 +-
 .../streaming/api/scala/AllWindowedStream.scala |  96 +++++++++
 .../streaming/api/scala/WindowedStream.scala    |  96 +++++++++
 .../StreamingScalaAPICompletenessTest.scala     |   3 +
 8 files changed, 612 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0cc1854..89c4857 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -222,6 +225,204 @@ public class AllWindowedStream<T, W extends Window> {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Aggregations on the  windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies an aggregation that sums every window of the data stream at the
+	 * given position.
+	 *
+	 * @param positionToSum The position in the tuple/array to sum
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> sum(int positionToSum) {
+		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that sums every window of the pojo data stream at
+	 * the given field for every window.
+	 *
+	 * <p>
+	 * A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * stream's underlying type. A dot can be used to drill down into objects,
+	 * as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field to sum
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> sum(String field) {
+		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of every window
+	 * of the data stream at the given position.
+	 *
+	 * @param positionToMin The position to minimize
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of the pojo data
+	 * stream at the given field expression for every window.
+	 *
+	 * <p>
+	 * A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> min(String field) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 *
+	 * @param positionToMinBy The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 *
+	 * @param positionToMinBy The position to minimize
+	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @param first If True then in case of field equality the first object will be returned
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum value of every window of
+	 * the data stream at the given position.
+	 *
+	 * @param positionToMax The position to maximize
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum value of the pojo data
+	 * stream at the given field expression for every window. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> max(String field) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 *
+	 * @param positionToMaxBy The position to maximize by
+	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @param first If True then in case of field equality the first object will be returned
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+	}
+
+	private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+		return reduce(aggregator);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
@@ -238,4 +439,8 @@ public class AllWindowedStream<T, W extends Window> {
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}
+
+	public TypeInformation<T> getInputType() {
+		return input.getType();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0ea9cad..1273b42 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -26,6 +26,9 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -241,6 +244,204 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Aggregations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies an aggregation that sums every window of the data stream at the
+	 * given position.
+	 *
+	 * @param positionToSum The position in the tuple/array to sum
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> sum(int positionToSum) {
+		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that sums every window of the pojo data stream at
+	 * the given field for every window.
+	 *
+	 * <p>
+	 * A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * stream's underlying type. A dot can be used to drill down into objects,
+	 * as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field to sum
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> sum(String field) {
+		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of every window
+	 * of the data stream at the given position.
+	 *
+	 * @param positionToMin The position to minimize
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of the pojo data
+	 * stream at the given field expression for every window.
+	 *
+	 * <p>
+	 * A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> min(String field) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 *
+	 * @param positionToMinBy The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 *
+	 * @param positionToMinBy The position to minimize
+	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @param first If True then in case of field equality the first object will be returned
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> minBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum value of every window of
+	 * the data stream at the given position.
+	 *
+	 * @param positionToMax The position to maximize
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum value of the pojo data
+	 * stream at the given field expression for every window. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> max(String field) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 *
+	 * @param positionToMaxBy The position to maximize by
+	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @param first If True then in case of field equality the first object will be returned
+	 * @return The transformed DataStream.
+	 */
+	public DataStream<T> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+	}
+
+	private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+		return reduce(aggregator);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
@@ -307,4 +508,8 @@ public class WindowedStream<T, K, W extends Window> {
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}
+
+	public TypeInformation<T> getInputType() {
+		return input.getType();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index 766a59e..e5501a0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -33,8 +33,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 	private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
 		super(pos);
 		this.comparator = Comparator.getForAggregation(aggregationType);
-		this.byAggregate = (aggregationType == AggregationType.MAXBY)
-				|| (aggregationType == AggregationType.MINBY);
+		this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
 		this.first = first;
 	}
 
@@ -61,7 +60,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public T reduce(T value1, T value2) throws Exception {
-		Comparable<Object> o1 = (Comparable<Object>)fieldAccessor.get(value1);
+		Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
 		Object o2 = fieldAccessor.get(value2);
 
 		int c = comparator.isExtremal(o1, o2);
@@ -79,10 +78,10 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 			return value2;
 
 		} else {
-			if (c == 1) {
-				value2 = fieldAccessor.set(value2, o1);
+			if (c == 0) {
+				value1 = fieldAccessor.set(value1, o2);
 			}
-			return value2;
+			return value1;
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index c23695e..b045233 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -43,6 +43,6 @@ public class SumAggregator<T> extends AggregationFunction<T> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public T reduce(T value1, T value2) throws Exception {
-		return fieldAccessor.set(value2, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
index bb4a123..c1a99a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.examples.windowing.util;
 
 public class SessionWindowingData {
 
-	public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,5,3)\n" +
+	public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,1,3)\n" +
 			"(a,10,1)";
 
 	private SessionWindowingData() {

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 9054d95..d2d0a1d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -134,6 +136,96 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     javaStream.apply(clean(function), implicitly[TypeInformation[R]])
   }
 
+  // ------------------------------------------------------------------------
+  //  Aggregations on the keyed windows
+  // ------------------------------------------------------------------------
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given position.
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given field.
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given position.
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given field.
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given position.
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given field.
+   */
+  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given position. When equality, returns the first.
+   */
+  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given field. When equality, returns the first.
+   */
+  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+    field)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given position. When equality, returns the first.
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given field. When equality, returns the first.
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+    field)
+
+  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+    val position = fieldNames2Indices(getInputType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
+
+  def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaAllWStream[Product, W]]
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM =>
+        new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
+
+      case _ =>
+        new ComparableAggregator(
+          position,
+          jStream.getInputType,
+          aggregationType,
+          true,
+          jStream.getExecutionEnvironment.getConfig)
+    }
+
+    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+  }
 
   // ------------------------------------------------------------------------
   //  Utilities
@@ -147,4 +239,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
   }
 
+  /**
+   * Gets the output type.
+   */
+  private def getInputType(): TypeInformation[T] = javaStream.getInputType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 2d6806d..3963765 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -137,6 +139,96 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     javaStream.apply(clean(function), implicitly[TypeInformation[R]])
   }
 
+  // ------------------------------------------------------------------------
+  //  Aggregations on the keyed windows
+  // ------------------------------------------------------------------------
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given position.
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given field.
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given position.
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given field.
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given position.
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given field.
+   */
+  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given position. When equality, returns the first.
+   */
+  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given field. When equality, returns the first.
+   */
+  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+    field)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given position. When equality, returns the first.
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given field. When equality, returns the first.
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+    field)
+
+  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+    val position = fieldNames2Indices(getInputType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
+
+  def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaWStream[Product, K, W]]
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM =>
+        new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
+
+      case _ =>
+        new ComparableAggregator(
+          position,
+          jStream.getInputType,
+          aggregationType,
+          true,
+          jStream.getExecutionEnvironment.getConfig)
+    }
+
+    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+  }
 
   // ------------------------------------------------------------------------
   //  Utilities
@@ -150,4 +242,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
   }
 
+  /**
+   * Gets the output type.
+   */
+  private def getInputType(): TypeInformation[T] = javaStream.getInputType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index c6bd87a..53aa1e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -51,11 +51,14 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
+
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
 
       "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
       "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
 
       "org.apache.flink.streaming.api.datastream.KeyedStream.transform",
       "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",