You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:51 UTC
[08/18] git commit: [FLINK-1080] [streaming] Streaming aggregation
update and refactor
[FLINK-1080] [streaming] Streaming aggregation update and refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bcbebed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bcbebed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bcbebed0
Branch: refs/heads/master
Commit: bcbebed01537ee7a414cd51428f4a3b8e8fc23e1
Parents: 4d73f51
Author: gyfora <gy...@gmail.com>
Authored: Fri Sep 5 16:07:59 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 116 +++++++++++---
.../api/datastream/GroupedDataStream.java | 71 +++++++--
.../aggregation/AggregationFunction.java | 34 +++++
.../ComparableAggregationFunction.java | 12 +-
.../aggregation/MaxAggregationFunction.java | 32 ++++
.../aggregation/MinAggregationFunction.java | 32 ++++
.../StreamingAggregationFunction.java | 45 ------
.../StreamingMaxAggregationFunction.java | 32 ----
.../StreamingMinAggregationFunction.java | 32 ----
.../StreamingSumAggregationFunction.java | 64 --------
.../aggregation/SumAggregationFunction.java | 150 +++++++++++++++++++
.../operator/BatchGroupReduceInvokable.java | 1 -
.../api/invokable/operator/BatchIterator.java | 3 +-
.../operator/WindowGroupReduceInvokable.java | 1 -
.../streaming/api/AggregationFunctionTest.java | 92 +++++++++---
.../api/invokable/operator/CoFlatMapTest.java | 6 -
16 files changed, 479 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 28d07d6..d78ceae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -34,12 +34,13 @@ import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
@@ -171,7 +172,43 @@ public abstract class DataStream<OUT> {
public TypeInformation<OUT> getOutputType() {
return this.outTypeWrapper.getTypeInfo();
}
-
+
+ /**
+ * Gets the class of the field at the given position
+ *
+ * @param pos
+ * Position of the field
+ * @return The class of the field
+ */
+ @SuppressWarnings("rawtypes")
+ protected Class<?> getClassAtPos(int pos) {
+ Class<?> type;
+ TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
+ if (outTypeInfo.isTupleType()) {
+ type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
+ } else if (pos == 0) {
+ type = outTypeInfo.getTypeClass();
+ } else {
+ throw new IndexOutOfBoundsException("Position is out of range");
+ }
+ return type;
+ }
+
+ /**
+ * Checks if the given field position is allowed for the output type
+ *
+ * @param pos
+ * Position to check
+ */
+ protected void checkFieldRange(int pos) {
+ try {
+ getClassAtPos(pos);
+ } catch (IndexOutOfBoundsException e) {
+ throw new RuntimeException("Selected field is out of range");
+
+ }
+ }
+
/**
* Creates a new {@link MergedDataStream} by merging {@link DataStream}
* outputs of the same type with each other. The DataStreams merged using
@@ -483,50 +520,82 @@ public abstract class DataStream<OUT> {
}
/**
- * Applies an aggregation that sums the data stream at the given
- * position.
+ * Applies an aggregation that sums the data stream at the given position.
*
* @param positionToSum
* The position in the data point to sum
* @return The transformed DataStream.
*/
+ @SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
- return aggregateAll(new StreamingSumAggregationFunction<OUT>(positionToSum));
+ checkFieldRange(positionToSum);
+ return aggregateAll((AggregationFunction<OUT>) SumAggregationFunction
+ .getSumFunction(positionToSum, getClassAtPos(positionToSum)));
+ }
+
+ /**
+ * Applies an aggregation that sums the data stream at the first position .
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> sum() {
+ return sum(0);
}
-
+
/**
- * Applies an aggregation that that gives the minimum of the data stream at the given
- * position.
+ * Applies an aggregation that that gives the minimum of the data stream at
+ * the given position.
*
* @param positionToMin
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
- return aggregateAll(new StreamingMinAggregationFunction<OUT>(positionToMin));
+ checkFieldRange(positionToMin);
+ return aggregateAll(new MinAggregationFunction<OUT>(positionToMin));
}
-
+
/**
- * Applies an aggregation that gives the maximum of the data stream at the given
- * position.
+ * Applies an aggregation that that gives the minimum of the data stream at
+ * the first position.
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min() {
+ return min(0);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum of the data stream at the
+ * given position.
*
* @param positionToMax
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
- return aggregateAll(new StreamingMaxAggregationFunction<OUT>(positionToMax));
+ checkFieldRange(positionToMax);
+ return aggregateAll(new MaxAggregationFunction<OUT>(positionToMax));
}
- private SingleOutputStreamOperator<OUT, ?> aggregateAll(StreamingAggregationFunction<OUT> aggregate) {
+ /**
+ * Applies an aggregation that gives the maximum of the data stream at the
+ * first position.
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max() {
+ return max(0);
+ }
+
+ private SingleOutputStreamOperator<OUT, ?> aggregateAll(
+ AggregationFunction<OUT> aggregate) {
return aggregate(aggregate, new StreamReduceInvokable<OUT>(aggregate), "reduce");
}
-
- SingleOutputStreamOperator<OUT, ?> aggregate(StreamingAggregationFunction<OUT> aggregate, StreamReduceInvokable<OUT> invokable, String functionName) {
- DataStream<OUT> inputStream = this.copy();
- TypeInformation<?> info = this.jobGraphBuilder.getOutTypeInfo(inputStream.getId());
- aggregate.setType(info);
+ SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate,
+ StreamReduceInvokable<OUT> invokable, String functionName) {
+ DataStream<OUT> inputStream = this.copy();
SingleOutputStreamOperator<OUT, ?> returnStream = inputStream.addFunction(functionName,
aggregate, null, null, invokable);
@@ -1014,7 +1083,8 @@ public abstract class DataStream<OUT> {
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
- DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", outTypeWrapper);
+ DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
+ outTypeWrapper);
try {
jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 2e1ed57..06bec0a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -21,10 +21,10 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
@@ -52,14 +52,19 @@ public class GroupedDataStream<OUT> {
}
/**
- * Gets the output type.
+ * Applies a reduce transformation on the grouped data stream grouped by the
+ * given key position. The {@link ReduceFunction} will receive input values
+ * based on the key value. Only input values with the same key will go to
+ * the same reducer.The user can also extend {@link RichReduceFunction} to
+ * gain access to other features provided by the {@link RichFuntion}
+ * interface. Gets the output type.
*
* @return The output type.
*/
public TypeInformation<OUT> getOutputType() {
return dataStream.getOutputType();
}
-
+
/**
* Applies a reduce transformation on the grouped data stream grouped on by
* the given key position. The {@link ReduceFunction} will receive input
@@ -78,7 +83,7 @@ public class GroupedDataStream<OUT> {
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
}
-
+
/**
* Applies a group reduce transformation on preset chunks of the grouped
* data stream. The {@link GroupReduceFunction} will receive input values
@@ -214,8 +219,22 @@ public class GroupedDataStream<OUT> {
* The position in the data point to sum
* @return The transformed DataStream.
*/
+ @SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
- return aggregateGroup(new StreamingSumAggregationFunction<OUT>(positionToSum));
+ dataStream.checkFieldRange(positionToSum);
+ return aggregateGroup((AggregationFunction<OUT>) SumAggregationFunction
+ .getSumFunction(positionToSum, dataStream.getClassAtPos(positionToSum)));
+ }
+
+ /**
+ * Applies an aggregation that sums the grouped data stream at the first
+ * position, grouped by the given key position. Input values with the same
+ * key will be summed.
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> sum() {
+ return sum(0);
}
/**
@@ -228,7 +247,21 @@ public class GroupedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
- return aggregateGroup(new StreamingMinAggregationFunction<OUT>(positionToMin));
+ dataStream.checkFieldRange(positionToMin);
+ return aggregateGroup(new MinAggregationFunction<OUT>(positionToMin));
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum of the grouped data stream
+ * at the first position, grouped by the given key position. Input values
+ * with the same key will be minimized.
+ *
+ * @param positionToMin
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min() {
+ return min(0);
}
/**
@@ -241,10 +274,24 @@ public class GroupedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
- return aggregateGroup(new StreamingMaxAggregationFunction<OUT>(positionToMax));
+ dataStream.checkFieldRange(positionToMax);
+ return aggregateGroup(new MaxAggregationFunction<OUT>(positionToMax));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum of the grouped data stream
+ * at the first position, grouped by the given key position. Input values
+ * with the same key will be maximized.
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max() {
+ return max(0);
}
- private SingleOutputStreamOperator<OUT, ?> aggregateGroup(StreamingAggregationFunction<OUT> aggregate) {
- return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate, keyPosition), "groupReduce");
+ private SingleOutputStreamOperator<OUT, ?> aggregateGroup(
+ AggregationFunction<OUT> aggregate) {
+ return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate,
+ keyPosition), "groupReduce");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..daae0b8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class AggregationFunction<T> implements ReduceFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public int position;
+ protected Tuple returnTuple;
+
+ public AggregationFunction(int pos) {
+ this.position = pos;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
index dc74715..0819340 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.function.aggregation;
import org.apache.flink.api.java.tuple.Tuple;
-public abstract class ComparableAggregationFunction<T> extends StreamingAggregationFunction<T> {
+public abstract class ComparableAggregationFunction<T> extends AggregationFunction<T> {
private static final long serialVersionUID = 1L;
@@ -39,25 +39,25 @@ public abstract class ComparableAggregationFunction<T> extends StreamingAggregat
return (T) returnTuple;
} else if (value1 instanceof Comparable) {
if (isExtremal((Comparable<Object>) value1, value2)) {
- value2 = value1;
+ return value1;
+ }else{
+ return value2;
}
} else {
throw new RuntimeException("The values " + value1 + " and "+ value2 + " cannot be compared.");
}
-
- return null;
}
public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
IllegalAccessException {
- copyTuple(tuple2);
Comparable<R> o1 = tuple1.getField(position);
R o2 = tuple2.getField(position);
if (isExtremal(o1, o2)) {
- returnTuple.setField(o1, position);
+ tuple2.setField(o1, position);
}
+ returnTuple = tuple2;
}
public abstract <R> boolean isExtremal(Comparable<R> o1, R o2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
new file mode 100644
index 0000000..521fff6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class MaxAggregationFunction<T> extends ComparableAggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public MaxAggregationFunction(int pos) {
+ super(pos);
+ }
+
+ @Override
+ public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+ return o1.compareTo(o2) > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
new file mode 100644
index 0000000..a01d6c0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class MinAggregationFunction<T> extends ComparableAggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public MinAggregationFunction(int pos) {
+ super(pos);
+ }
+
+ @Override
+ public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+ return o1.compareTo(o2) < 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
deleted file mode 100644
index 42c1053..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.TypeInformation;
-
-public abstract class StreamingAggregationFunction<T> implements ReduceFunction<T> {
- private static final long serialVersionUID = 1L;
-
- protected int position;
- private TypeSerializer<Tuple> typeSerializer;
- protected Tuple returnTuple;
-
- public StreamingAggregationFunction(int pos) {
- this.position = pos;
- }
-
- @SuppressWarnings("unchecked")
- public void setType(TypeInformation<?> type) {
- this.typeSerializer = (TypeSerializer<Tuple>) type.createSerializer();
- }
-
- protected void copyTuple(Tuple tuple) throws InstantiationException, IllegalAccessException {
- returnTuple = (Tuple) typeSerializer.createInstance();
- typeSerializer.copy(tuple, returnTuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
deleted file mode 100644
index bae0043..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-public class StreamingMaxAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamingMaxAggregationFunction(int pos) {
- super(pos);
- }
-
- @Override
- public <R> boolean isExtremal(Comparable<R> o1, R o2) {
- return o1.compareTo(o2) > 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
deleted file mode 100644
index eb349c6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-public class StreamingMinAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamingMinAggregationFunction(int pos) {
- super(pos);
- }
-
- @Override
- public <R> boolean isExtremal(Comparable<R> o1, R o2) {
- return o1.compareTo(o2) < 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
deleted file mode 100644
index 1a043c1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class StreamingSumAggregationFunction<T> extends StreamingAggregationFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamingSumAggregationFunction(int pos) {
- super(pos);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T reduce(T value1, T value2) throws Exception {
- if (value1 instanceof Tuple) {
- Tuple tuple1 = (Tuple) value1;
- Tuple tuple2 = (Tuple) value2;
-
- copyTuple(tuple2);
- returnTuple.setField(add(tuple1.getField(position), tuple2.getField(position)), position);
-
- return (T) returnTuple;
- } else {
- return (T) add(value1, value2);
- }
- }
-
- private Object add(Object value1, Object value2) {
- if (value1 instanceof Integer) {
- return (Integer) value1 + (Integer) value2;
- } else if (value1 instanceof Double) {
- return (Double) value1 + (Double) value2;
- } else if (value1 instanceof Float) {
- return (Float) value1 + (Float) value2;
- } else if (value1 instanceof Long) {
- return (Long) value1 + (Long) value2;
- } else if (value1 instanceof Short) {
- return (short) ((Short) value1 + (Short) value2);
- } else if (value1 instanceof Byte) {
- return (byte) ((Byte) value1 + (Byte) value2);
- } else {
- throw new RuntimeException("DataStream cannot be summed because the class "
- + value1.getClass().getSimpleName() + " does not support the + operator.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
new file mode 100644
index 0000000..3f54590
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public SumAggregationFunction(int pos) {
+ super(pos);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+ if (value1 instanceof Tuple) {
+ Tuple tuple1 = (Tuple) value1;
+ Tuple tuple2 = (Tuple) value2;
+
+ returnTuple = tuple2;
+ returnTuple.setField(add(tuple1.getField(position), tuple2.getField(position)),
+ position);
+
+ return (T) returnTuple;
+ } else {
+ return (T) add(value1, value2);
+ }
+ }
+
+ protected abstract Object add(Object value1, Object value2);
+
+ @SuppressWarnings("rawtypes")
+ public static <T> SumAggregationFunction getSumFunction(int pos, Class<T> type) {
+
+ if (type == Integer.class) {
+ return new IntSum<T>(pos);
+ } else if (type == Long.class) {
+ return new LongSum<T>(pos);
+ } else if (type == Short.class) {
+ return new ShortSum<T>(pos);
+ } else if (type == Double.class) {
+ return new DoubleSum<T>(pos);
+ } else if (type == Float.class) {
+ return new FloatSum<T>(pos);
+ } else if (type == Byte.class) {
+ return new ByteSum<T>(pos);
+ } else {
+ throw new RuntimeException("DataStream cannot be summed because the class "
+ + type.getSimpleName() + " does not support the + operator.");
+ }
+
+ }
+
+ private static class IntSum<T> extends SumAggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public IntSum(int pos) {
+ super(pos);
+ }
+
+ @Override
+ protected Object add(Object value1, Object value2) {
+ return (Integer) value1 + (Integer) value2;
+ }
+ }
+
+ private static class LongSum<T> extends SumAggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public LongSum(int pos) {
+ super(pos);
+ }
+
+ @Override
+ protected Object add(Object value1, Object value2) {
+ return (Long) value1 + (Long) value2;
+ }
+ }
+
+ private static class DoubleSum<T> extends SumAggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DoubleSum(int pos) {
+ super(pos);
+ }
+
+ @Override
+ protected Object add(Object value1, Object value2) {
+ return (Double) value1 + (Double) value2;
+ }
+ }
+
+ private static class ShortSum<T> extends SumAggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public ShortSum(int pos) {
+ super(pos);
+ }
+
+ @Override
+ protected Object add(Object value1, Object value2) {
+ return (Short) value1 + (Short) value2;
+ }
+ }
+
+ private static class FloatSum<T> extends SumAggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public FloatSum(int pos) {
+ super(pos);
+ }
+
+ @Override
+ protected Object add(Object value1, Object value2) {
+ return (Float) value1 + (Float) value2;
+ }
+ }
+
+ private static class ByteSum<T> extends SumAggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public ByteSum(int pos) {
+ super(pos);
+ }
+
+ @Override
+ protected Object add(Object value1, Object value2) {
+ return (Byte) value1 + (Byte) value2;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index 95b3249..be6392e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -30,7 +30,6 @@ public class BatchGroupReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN,
private static final long serialVersionUID = 1L;
int keyPosition;
- protected GroupReduceFunction<IN, OUT> reducer;
private Iterator<StreamRecord<IN>> iterator;
private MutableTableState<Object, List<IN>> values;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
index dc27da4..05f888f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
@@ -17,8 +17,9 @@
package org.apache.flink.streaming.api.invokable.operator;
+import java.io.Serializable;
import java.util.Iterator;
-public interface BatchIterator<IN> extends Iterator<IN> {
+public interface BatchIterator<IN> extends Iterator<IN>, Serializable {
public void reset();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 8b658f3..87b00f9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.state.MutableTableState;
public class WindowGroupReduceInvokable<IN, OUT> extends WindowReduceInvokable<IN, OUT> {
int keyPosition;
- protected GroupReduceFunction<IN, OUT> reducer;
private Iterator<StreamRecord<IN>> iterator;
private MutableTableState<Object, List<IN>> values;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 7a502aa..3861aab 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -18,15 +18,16 @@
package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.util.MockInvokable;
@@ -36,26 +37,33 @@ public class AggregationFunctionTest {
@Test
public void groupSumIntegerTest() {
- ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
+ List<Integer> expectedSumList0 = new ArrayList<Integer>();
+ List<Integer> expectedMinList0 = new ArrayList<Integer>();
+ List<Integer> expectedMaxList0 = new ArrayList<Integer>();
List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
+ List<Integer> simpleInput = new ArrayList<Integer>();
+
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
- inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
+ simpleInput.add(i);
expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+ expectedSumList0.add((i + 1) * i / 2);
+ expectedMaxList0.add(i);
+ expectedMinList0.add(0);
+
int groupedSum;
switch (i % 3) {
case 0:
@@ -74,30 +82,38 @@ public class AggregationFunctionTest {
expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
}
- StreamingSumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = new StreamingSumAggregationFunction<Tuple2<Integer, Integer>>(
- 1);
- StreamingMinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new StreamingMinAggregationFunction<Tuple2<Integer, Integer>>(
+ @SuppressWarnings("unchecked")
+ SumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregationFunction
+ .getSumFunction(1, Integer.class);
+ @SuppressWarnings("unchecked")
+ SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction
+ .getSumFunction(0, Integer.class);
+ MinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new MinAggregationFunction<Tuple2<Integer, Integer>>(
1);
- StreamingMaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new StreamingMaxAggregationFunction<Tuple2<Integer, Integer>>(
+ MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>(
+ 0);
+ MaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new MaxAggregationFunction<Tuple2<Integer, Integer>>(
1);
-
- sumFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
- minFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
- maxFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
+ MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>(
+ 0);
List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), inputList);
+ new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
+
List<Tuple2<Integer, Integer>> minList = MockInvokable.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), inputList);
+ new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
+
List<Tuple2<Integer, Integer>> maxList = MockInvokable.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), inputList);
+ new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
- new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), inputList);
+ new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
+
List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
- new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), inputList);
+ new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
+
List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
- new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), inputList);
+ new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
assertEquals(expectedSumList, sumList);
assertEquals(expectedMinList, minList);
@@ -105,5 +121,39 @@ public class AggregationFunctionTest {
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
+ assertEquals(expectedSumList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(sumFunction0),simpleInput ));
+ assertEquals(expectedMinList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(minFunction0),simpleInput ));
+ assertEquals(expectedMaxList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(maxFunction0),simpleInput ));
+
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+ try {
+ env.generateSequence(1, 100).min(1);
+ fail();
+ } catch (Exception e) {
+ //Nothing to do here
+ }
+ try {
+ env.generateSequence(1, 100).min(2);
+ fail();
+ } catch (Exception e) {
+ //Nothing to do here
+ }
+ try {
+ env.generateSequence(1, 100).min(3);
+ fail();
+ } catch (Exception e) {
+ //Nothing to do here
+ }
+
+ }
+
+ private List<Tuple2<Integer, Integer>> getInputList() {
+ ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
+ for (int i = 0; i < 9; i++) {
+ inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+ }
+ return inputList;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index 0c4bad1..acf2f28 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -80,11 +80,5 @@ public class CoFlatMapTest implements Serializable {
// expected
}
- try {
- env.fromElements(10, 11).connect(ds2);
- fail();
- } catch (RuntimeException e) {
- // expected
- }
}
}