You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/25 17:19:31 UTC

[2/2] flink git commit: [Flink-1780] Rename FlatCombineFunction to GroupCombineFunction

[Flink-1780] Rename FlatCombineFunction to GroupCombineFunction

This closes #530


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/033c69f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/033c69f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/033c69f9

Branch: refs/heads/master
Commit: 033c69f9477c6352865e7e0da01296dd778ffe59
Parents: ae04025
Author: Suneel Marthi <su...@gmail.com>
Authored: Tue Mar 24 16:19:32 2015 -0400
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 25 17:13:51 2015 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 |  2 +-
 .../api/common/functions/CombineFunction.java   |  2 +-
 .../common/functions/FlatCombineFunction.java   | 51 --------------------
 .../common/functions/GroupCombineFunction.java  | 51 ++++++++++++++++++++
 .../functions/RichFlatCombineFunction.java      | 42 ----------------
 .../functions/RichGroupCombineFunction.java     | 39 +++++++++++++++
 .../functions/RichGroupReduceFunction.java      |  4 +-
 .../base/GroupCombineOperatorBase.java          |  6 +--
 .../operators/base/GroupReduceOperatorBase.java |  8 +--
 .../java/org/apache/flink/api/java/DataSet.java |  4 +-
 .../flink/api/java/functions/FirstReducer.java  |  4 +-
 .../java/operators/GroupCombineOperator.java    | 22 ++++-----
 .../api/java/operators/GroupReduceOperator.java |  6 +--
 .../api/java/operators/SortedGrouping.java      |  4 +-
 .../api/java/operators/UnsortedGrouping.java    |  4 +-
 .../PlanUnwrappingGroupCombineOperator.java     | 12 ++---
 .../PlanUnwrappingReduceGroupOperator.java      | 10 ++--
 ...lanUnwrappingSortedGroupCombineOperator.java | 12 ++---
 ...PlanUnwrappingSortedReduceGroupOperator.java | 10 ++--
 .../java/record/operators/ReduceOperator.java   |  4 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 24 ++++-----
 .../java/record/ReduceWrappingFunctionTest.java | 16 +++---
 .../operators/AllGroupCombineDriver.java        | 16 +++---
 .../runtime/operators/AllGroupReduceDriver.java | 10 ++--
 .../operators/GroupReduceCombineDriver.java     | 20 ++++----
 .../runtime/operators/RegularPactTask.java      | 14 ++++--
 .../SynchronousChainedCombineDriver.java        | 12 ++---
 .../sort/CombiningUnilateralSortMerger.java     | 16 +++---
 .../runtime/blob/BlobCacheSuccessTest.java      | 18 +++----
 .../org/apache/flink/api/scala/DataSet.scala    |  4 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |  6 +--
 .../CustomRankCombiner.java                     |  4 +-
 .../javaApiOperators/GroupCombineITCase.java    | 20 ++++----
 .../scala/operators/GroupCombineITCase.scala    |  6 +--
 34 files changed, 242 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 2bec61b..3bb3cec 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -554,7 +554,7 @@ an alternative WordCount implementation. In the implementation,
 DataSet<String> input = [..] // The words received as input
 DataSet<String> groupedInput = input.groupBy(0); // group identical words
 
-DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new FlatCombineFunction<String, Tuple2<String, Integer>() {
+DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
 
     public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
         int count = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index ef52b32..af115b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
  * reduce the data volume earlier, before the entire groups have been collected.
  * <p>
  * This special variant of the combine function reduces the group of elements into a single element. A variant
- * that can return multiple values per group is defined in {@link FlatCombineFunction}.
+ * that can return multiple values per group is defined in {@link GroupCombineFunction}.
  * 
  * @param <IN> The data type processed by the combine function.
  * @param <OUT> The data type emitted by the combine function.

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
deleted file mode 100644
index b90b3ce..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
- * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
- * only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
- * reduce the data volume earlier, before the entire groups have been collected.
- * <p>
- * This special variant of the combine function supports to return more than one element per group.
- * It is frequently less efficient to use than the {@link CombineFunction}.
- * 
- * @param <IN> The data type processed by the combine function.
- * @param <OUT> The data type emitted by the combine function.
- */
-public interface FlatCombineFunction<IN, OUT> extends Function, Serializable {
-
-	/**
-	 * The combine method, called (potentially multiple timed) with subgroups of elements.
-	 * 
-	 * @param values The elements to be combined.
-	 * @param out The collector to use to return values from the function.
-	 * 
-	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
-	 *                   and may trigger the recovery logic.
-	 */
-	void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
new file mode 100644
index 0000000..c0b153b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been collected.
+ * <p>
+ * This special variant of the combine function supports to return more than one element per group.
+ * It is frequently less efficient to use than the {@link CombineFunction}.
+ * 
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
+ */
+public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {
+
+	/**
+	 * The combine method, called (potentially multiple timed) with subgroups of elements.
+	 *
+	 * @param values The elements to be combined.
+	 * @param out The collector to use to return values from the function.
+	 *
+	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+	 *                   and may trigger the recovery logic.
+	 */
+	void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
deleted file mode 100644
index 17aca88..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
- * {@link RichFunction#close()}.
- *
- * @param <IN> The data type of the elements to be combined.
- * @param <OUT> The resulting data type of the elements to be combined.
- */
-public abstract class RichFlatCombineFunction<IN, OUT> extends AbstractRichFunction implements FlatCombineFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
new file mode 100644
index 0000000..55df232
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link GroupCombineFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> The data type of the elements to be combined.
+ * @param <OUT> The resulting data type of the elements to be combined.
+ */
+public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index b6c92c2..48e27d3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
  */
-public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN, IN> {
+public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, GroupCombineFunction<IN, IN> {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -83,5 +83,5 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	 */
 	@Retention(RetentionPolicy.RUNTIME)
 	@Target(ElementType.TYPE)
-	public static @interface Combinable {};
+	public static @interface Combinable {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index 2a47c45..27fbc1c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -46,7 +46,7 @@ import java.util.List;
  * This class is later processed by the compiler to generate the plan.
  * @see org.apache.flink.api.common.functions.CombineFunction
  */
-public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 
 
 	/** The ordering for the order inside a reduce group. */
@@ -81,7 +81,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN
 
 	@Override
 	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
-		FlatCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
+		GroupCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 
 		UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
 		TypeInformation<IN> inputType = operatorInfo.getInputType();

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 6d7db89..57f07f3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -107,15 +107,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 	/**
 	 * Marks the group reduce operation as combinable. Combinable operations may pre-reduce the
 	 * data before the actual group reduce operations. Combinable user-defined functions
-	 * must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}.
+	 * must implement the interface {@link GroupCombineFunction}.
 	 * 
 	 * @param combinable Flag to mark the group reduce operation as combinable.
 	 */
 	public void setCombinable(boolean combinable) {
 		// sanity check
-		if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
+		if (combinable && !GroupCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
 			throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " +
-					FlatCombineFunction.class.getName());
+					GroupCombineFunction.class.getName());
 		} else {
 			this.combinable = combinable;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ed8d1ca..1e91eeb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -473,7 +473,7 @@ public abstract class DataSet<T> {
 	 * @param combiner The CombineFunction that is applied on the DataSet.
 	 * @return A GroupCombineOperator which represents the combined DataSet.
 	 */
-	public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+	public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
 		if (combiner == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index fbb7029..a604cc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -17,13 +17,13 @@
  */
 
 package org.apache.flink.api.java.functions;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 @Combinable
-public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T, T> {
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
 	private static final long serialVersionUID = 1L;
 
 	private final int count;

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 3c1d47c..911c608 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
@@ -46,7 +46,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
  */
 public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupCombineOperator<IN, OUT>> {
 
-	private final FlatCombineFunction<IN, OUT> function;
+	private final GroupCombineFunction<IN, OUT> function;
 
 	private final Grouping<IN> grouper;
 
@@ -60,7 +60,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	 * @param function The user-defined GroupReduce function.
 	 * @param defaultName The operator's name.
 	 */
-	public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+	public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		this.function = function;
 		this.grouper = null;
@@ -73,7 +73,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	 * @param input The grouped input to be processed group-wise by the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+	public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
 		super(input != null ? input.getDataSet() : null, resultType);
 
 		this.function = function;
@@ -82,7 +82,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	}
 
 	@Override
-	protected FlatCombineFunction<IN, OUT> getFunction() {
+	protected GroupCombineFunction<IN, OUT> getFunction() {
 		return function;
 	}
 
@@ -99,8 +99,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		if (grouper == null) {
 			// non grouped reduce
 			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-			GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
-					new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+			GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
+					new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
 
 			po.setInput(input);
 			// the parallelism for a non grouped reduce can only be 1
@@ -144,8 +144,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
 			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-			GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
-					new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+			GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
+					new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
 
 			po.setInput(input);
 			po.setParallelism(getParallelism());
@@ -175,7 +175,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	// --------------------------------------------------------------------------------------------
 
 	private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer(
-			Keys.SelectorFunctionKeys<IN, ?> rawKeys, FlatCombineFunction<IN, OUT> function,
+			Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupCombineFunction<IN, OUT> function,
 			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
 	{
 		@SuppressWarnings("unchecked")
@@ -199,7 +199,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	}
 
 	private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
-			Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> function,
+			Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> function,
 			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
 	{
 		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index e809623..30f2cc4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
@@ -88,7 +88,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	}
 
 	private void checkCombinability() {
-		if (function instanceof FlatCombineFunction &&
+		if (function instanceof GroupCombineFunction &&
 				function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) {
 			this.combinable = true;
 		}
@@ -111,7 +111,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	
 	public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
 		// sanity check that the function is a subclass of the combine interface
-		if (combinable && !(function instanceof FlatCombineFunction)) {
+		if (combinable && !(function instanceof GroupCombineFunction)) {
 			throw new IllegalArgumentException("The function does not implement the combine interface.");
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index b2054bf..287bf82 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
@@ -169,7 +169,7 @@ public class SortedGrouping<T> extends Grouping<T> {
 	 * @param combiner The CombineFunction that is applied on the DataSet.
 	 * @return A GroupCombineOperator which represents the combined DataSet.
 	 */
-	public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+	public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
 		if (combiner == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 0f3faa0..319a599 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -173,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @param combiner The CombineFunction that is applied on the DataSet.
 	 * @return A GroupCombineOperator which represents the combined DataSet.
 	 */
-	public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+	public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
 		if (combiner == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index ae4ba11..c8e40ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
  * A group combine operator that takes 2-tuples (key-value pairs), and applies the group combine operation only
  * on the unwrapped values.
  */
-public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, OUT>> {
+public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {
 
-	public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
+	public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
 												TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
 	{
 		super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
@@ -42,15 +42,15 @@ public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombine
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
-		implements FlatCombineFunction<Tuple2<K, IN>, OUT>
+	public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+		implements GroupCombineFunction<Tuple2<K, IN>, OUT>
 	{
 	
 		private static final long serialVersionUID = 1L;
 		
 		private final TupleUnwrappingIterator<IN, K> iter; 
 		
-		private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, OUT> wrapped) {
+		private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<IN, K>();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 1d59a21..e01af50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
@@ -37,7 +37,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
 			TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
 	{
-		super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+		super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
 				new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
 		
 		super.setCombinable(combinable);
@@ -46,8 +46,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	// --------------------------------------------------------------------------------------------
 	
 	@RichGroupReduceFunction.Combinable
-	public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
+	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
 	{
 
 		private static final long serialVersionUID = 1L;
@@ -55,7 +55,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 		private TupleUnwrappingIterator<IN, K> iter;
 		private TupleWrappingCollector<IN, K> coll; 
 		
-		private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
+		private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<IN, K>();
 			this.coll = new TupleWrappingCollector<IN, K>(this.iter);

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index b3d8470..e52a5c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
  * A reduce operator that takes 3-tuples (groupKey, sortKey, value), and applies the sorted partial group reduce
  * operation only on the unwrapped values.
  */
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, FlatCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
 
-	public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
+	public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
 													TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
 	{
 		super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
@@ -42,15 +42,15 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G
 
 	}
 
-	public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
-			implements FlatCombineFunction<Tuple3<K1, K2, IN>, OUT>
+	public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+			implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
 	{
 
 		private static final long serialVersionUID = 1L;
 
 		private final Tuple3UnwrappingIterator<IN, K1, K2> iter;
 
-		private TupleUnwrappingGroupReducer(FlatCombineFunction<IN, OUT> wrapped) {
+		private TupleUnwrappingGroupReducer(GroupCombineFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 757ff56..63ebfa4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
@@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 	public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
 											TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable)
 	{
-		super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+		super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
 			new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
 
 		super.setCombinable(combinable);
@@ -46,8 +46,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 	// --------------------------------------------------------------------------------------------
 
 	@RichGroupReduceFunction.Combinable
-	public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, FlatCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
+	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
 	{
 
 		private static final long serialVersionUID = 1L;
@@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		private Tuple3UnwrappingIterator<IN, K1, K2> iter;
 		private Tuple3WrappingCollector<IN, K1, K2> coll;
 
-		private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
+		private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
 			this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter);

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index 875e9c1..1866fea 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
@@ -367,7 +367,7 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Grou
 	
 	// ============================================================================================
 	
-	public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record, Record> {
+	public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> {
 		
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 4527aa0..ae6063a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -34,7 +34,7 @@ import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -135,14 +135,14 @@ public class TypeExtractor {
 		return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
 	}
 
-	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
+	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
 		return getGroupCombineReturnTypes(combineInterface, inType, null, false);
 	}
 
-	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
+	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
 																			String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) combineInterface, FlatCombineFunction.class, true, true, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
 	}
 	
 	
@@ -600,7 +600,7 @@ public class TypeExtractor {
 		// the input is a tuple
 		else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) 
 				&& Tuple.class.isAssignableFrom(typeToClass(inType))) {
-			ParameterizedType tupleBaseClass = null;
+			ParameterizedType tupleBaseClass;
 			
 			// get tuple from possible tuple subclass
 			while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) {
@@ -737,7 +737,7 @@ public class TypeExtractor {
 			// check for basic type
 			if (typeInfo.isBasicType()) {
 				
-				TypeInformation<?> actual = null;
+				TypeInformation<?> actual;
 				// check if basic type at all
 				if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
 					throw new InvalidTypesException("Basic type expected.");
@@ -792,7 +792,7 @@ public class TypeExtractor {
 				}
 				
 				// check writable type contents
-				Class<?> clazz = null;
+				Class<?> clazz;
 				if (((WritableTypeInfo<?>) typeInfo).getTypeClass() != (clazz = (Class<?>) type)) {
 					throw new InvalidTypesException("Writable type '"
 							+ ((WritableTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
@@ -801,7 +801,7 @@ public class TypeExtractor {
 			}
 			// check for primitive array
 			else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
-				Type component = null;
+				Type component;
 				// check if array at all
 				if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
 						&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
@@ -819,7 +819,7 @@ public class TypeExtractor {
 			}
 			// check for basic array
 			else if (typeInfo instanceof BasicArrayTypeInfo<?, ?>) {
-				Type component = null;
+				Type component;
 				// check if array at all
 				if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
 						&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
@@ -844,7 +844,7 @@ public class TypeExtractor {
 				}
 				
 				// check component
-				Type component = null;
+				Type component;
 				if (type instanceof Class<?>) {
 					component = ((Class<?>) type).getComponentType();
 				} else {
@@ -1428,8 +1428,8 @@ public class TypeExtractor {
 		if (!(t1 instanceof TypeVariable) || !(t2 instanceof TypeVariable)) {
 			return false;
 		}
-		return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>)t2).getName())
-				&& ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>)t2).getGenericDeclaration());
+		return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName())
+				&& ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration());
 	}
 	
 	private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
index 2216217..89baa98 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.api.java.record;
 
-import static org.junit.Assert.*;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
@@ -43,6 +40,11 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 @SuppressWarnings({ "serial", "deprecation" })
 public class ReduceWrappingFunctionTest {
 
@@ -86,7 +88,7 @@ public class ReduceWrappingFunctionTest {
 			target.clear();
 			
 			// test combine
-			((FlatCombineFunction<Record, Record>) reducer).combine(source, collector);
+			((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
 			assertEquals(2, target.size());
 			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
 			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
@@ -138,7 +140,7 @@ public class ReduceWrappingFunctionTest {
 			target.clear();
 			
 			// test combine
-			((FlatCombineFunction<Record, Record>) reducer).combine(source, collector);
+			((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
 			assertEquals(2, target.size());
 			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
 			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
@@ -227,5 +229,5 @@ public class ReduceWrappingFunctionTest {
 			methodCounter.incrementAndGet();
 			super.open(parameters);
 		}
-	};
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 7d87a6b..7b279ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -20,7 +20,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
@@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory;
 * Like @org.apache.flink.runtime.operators.GroupCombineDriver but without grouping and sorting. May emit partially
 * reduced results.
 *
-* @see org.apache.flink.api.common.functions.FlatCombineFunction
+* @see GroupCombineFunction
 */
-public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+public class AllGroupCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class);
 
-	private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+	private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
 	private boolean objectReuseEnabled = false;
 
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) {
+	public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
 		this.taskContext = context;
 	}
 
@@ -58,9 +58,9 @@ public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFun
 	}
 
 	@Override
-	public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+	public Class<GroupCombineFunction<IN, OUT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+		final Class<GroupCombineFunction<IN, OUT>> clazz = (Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class;
 		return clazz;
 	}
 
@@ -95,7 +95,7 @@ public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFun
 		TypeSerializer<IN> serializer = serializerFactory.getSerializer();
 
 		final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
-		final FlatCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
+		final GroupCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
 		final Collector<OUT> output = this.taskContext.getOutputCollector();
 
 		if (objectReuseEnabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index ad1afdb..a20fddf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -92,8 +92,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 
 		switch (this.strategy) {
 			case ALL_GROUP_REDUCE_COMBINE:
-				if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) {
-					throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName());
+				if (!(this.taskContext.getStub() instanceof GroupCombineFunction)) {
+					throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GroupCombineFunction.class.getName());
 				}
 			case ALL_GROUP_REDUCE:
 			case ALL_GROUP_COMBINE:
@@ -129,7 +129,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 					final Collector<OT> output = this.taskContext.getOutputCollector();
 					reducer.reduce(inIter, output);
 				} else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) {
-					@SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub();
+					@SuppressWarnings("unchecked") final GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) this.taskContext.getStub();
 					final Collector<OT> output = this.taskContext.getOutputCollector();
 					combiner.combine(inIter, output);
 				} else {
@@ -147,7 +147,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 					final Collector<OT> output = this.taskContext.getOutputCollector();
 					reducer.reduce(inIter, output);
 				} else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) {
-					@SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub();
+					@SuppressWarnings("unchecked") final GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) this.taskContext.getStub();
 					final Collector<OT> output = this.taskContext.getOutputCollector();
 					combiner.combine(inIter, output);
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index dacd436..493eb4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -45,7 +45,7 @@ import java.util.List;
  * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a
  * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution.
  * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result.
- * The CombineGroup uses the FlatCombineFunction interface which allows to combine values of type <IN> to any type
+ * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type
  * of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and
  * output type to be able to reduce the elements after the combine from <IN> to <OUT>.
  *
@@ -54,18 +54,18 @@ import java.util.List;
  * @param <IN> The data type consumed by the combiner.
  * @param <OUT> The data type produced by the combiner.
  */
-public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class);
 
 	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
 	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 
-	private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+	private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
 	private InMemorySorter<IN> sorter;
 
-	private FlatCombineFunction<IN, OUT> combiner;
+	private GroupCombineFunction<IN, OUT> combiner;
 
 	private TypeSerializer<IN> serializer;
 
@@ -86,7 +86,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) {
+	public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -97,9 +97,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
 	}
 	
 	@Override
-	public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+	public Class<GroupCombineFunction<IN, OUT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+		final Class<GroupCombineFunction<IN, OUT>> clazz = (Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class;
 		return clazz;
 	}
 
@@ -188,7 +188,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
 				final ReusingKeyGroupedIterator<IN> keyIter = 
 						new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
 
-				final FlatCombineFunction<IN, OUT> combiner = this.combiner;
+				final GroupCombineFunction<IN, OUT> combiner = this.combiner;
 				final Collector<OUT> output = this.output;
 
 				// iterate over key groups
@@ -203,7 +203,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
 				final NonReusingKeyGroupedIterator<IN> keyIter = 
 						new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
 
-				final FlatCombineFunction<IN, OUT> combiner = this.combiner;
+				final GroupCombineFunction<IN, OUT> combiner = this.combiner;
 				final Collector<OUT> output = this.output;
 
 				// iterate over key groups

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index ca110c2..71b8afc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -525,7 +525,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				try {
 					FunctionUtils.closeFunction(this.stub);
 				}
-				catch (Throwable t) {}
+				catch (Throwable t) {
+					// do nothing
+				}
 			}
 			
 			// if resettable driver invoke teardown
@@ -1006,13 +1008,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 							(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
 				}
 				
-				if (!(localStub instanceof FlatCombineFunction)) {
+				if (!(localStub instanceof GroupCombineFunction)) {
 					throw new IllegalStateException("Performing combining sort outside a reduce task!");
 				}
 
 				@SuppressWarnings({ "rawtypes", "unchecked" })
 				CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
-					(FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+					(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
 					this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
 					this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
 					this.config.getSpillingThresholdInput(inputNum));
@@ -1467,7 +1469,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		for (int i = 0; i < tasks.size(); i++) {
 			try {
 				tasks.get(i).cancelTask();
-			} catch (Throwable t) {}
+			} catch (Throwable t) {
+				// do nothing
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 7e36b49..4116145 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.chaining;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -67,7 +67,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 	private InMemorySorter<IN> sorter;
 
-	private FlatCombineFunction<IN, OUT> combiner;
+	private GroupCombineFunction<IN, OUT> combiner;
 
 	private TypeSerializer<IN> serializer;
 
@@ -90,8 +90,8 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 		this.parent = parent;
 
 		@SuppressWarnings("unchecked")
-		final FlatCombineFunction<IN, OUT> combiner =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class);
+		final GroupCombineFunction<IN, OUT> combiner =
+			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class);
 		this.combiner = combiner;
 		FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
 	}
@@ -210,7 +210,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 
 				// cache references on the stack
-				final FlatCombineFunction<IN, OUT> stub = this.combiner;
+				final GroupCombineFunction<IN, OUT> stub = this.combiner;
 				final Collector<OUT> output = this.outputCollector;
 
 				// run stub implementation
@@ -226,7 +226,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 
 				// cache references on the stack
-				final FlatCombineFunction<IN, OUT> stub = this.combiner;
+				final GroupCombineFunction<IN, OUT> stub = this.combiner;
 				final Collector<OUT> output = this.outputCollector;
 
 				// run stub implementation

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 9282fd4..8da9413 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +70,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 */
 	private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
 
-	private final FlatCombineFunction<E, E> combineStub;	// the user code stub that does the combining
+	private final GroupCombineFunction<E, E> combineStub;	// the user code stub that does the combining
 	
 	private Configuration udfConfig;
 	
@@ -100,7 +100,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
 	 */
-	public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+	public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
@@ -132,7 +132,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
 	 */
-	public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+	public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
@@ -189,7 +189,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			// ------------------- In-Memory Cache ------------------------
 			
 			final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
-			CircularElement<E> element = null;
+			CircularElement<E> element;
 			boolean cacheOnly = false;
 			
 			// fill cache
@@ -253,7 +253,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			
 			// ------------------- Spilling Phase ------------------------
 			
-			final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+			final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
 			
 			// now that we are actually spilling, take the combiner, and open it
 			try {
@@ -463,7 +463,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 																			this.memManager.getPageSize());
 			
 			final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
-			final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+			final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
 
 			// combine and write to disk
 			try {
@@ -573,7 +573,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				throw new TraversableOnceException();
 			}
 		}
-	};
+	}
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 4b92b71..5c3ecf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -33,6 +28,11 @@ import java.util.List;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * This class contains unit tests for the {@link BlobCache}.
  */
@@ -70,8 +70,8 @@ public class BlobCacheSuccessTest {
 
 			blobCache = new BlobCache(serverAddress, new Configuration());
 
-			for(int i = 0; i < blobKeys.size(); i++){
-				blobCache.getURL(blobKeys.get(i));
+			for (BlobKey blobKey : blobKeys) {
+				blobCache.getURL(blobKey);
 			}
 
 			// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
@@ -87,9 +87,7 @@ public class BlobCacheSuccessTest {
 			// Verify the result
 			assertEquals(blobKeys.size(), urls.length);
 
-			for (int i = 0; i < urls.length; ++i) {
-
-				final URL url = urls[i];
+			for (final URL url : urls) {
 
 				assertNotNull(url);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 1291181..2732112 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -641,7 +641,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    *  arbitrary output type.
    */
   def combineGroup[R: TypeInformation: ClassTag](
-      combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+      combiner: GroupCombineFunction[T, R]): DataSet[R] = {
     if (combiner == null) {
       throw new NullPointerException("Combine function must not be null.")
     }
@@ -670,7 +670,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (fun == null) {
       throw new NullPointerException("Combine function must not be null.")
     }
-    val combiner = new FlatCombineFunction[T, R] {
+    val combiner = new GroupCombineFunction[T, R] {
       val cleanFun = clean(fun)
       def combine(in: java.lang.Iterable[T], out: Collector[R]) {
         cleanFun(in.iterator().asScala, out)

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index eca4563..d547ea4 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.functions.{KeySelector, FirstReducer}
 import org.apache.flink.api.scala.operators.ScalaAggregateOperator
 import scala.collection.JavaConverters._
 import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.functions.{FlatCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner}
+import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.operators._
@@ -370,7 +370,7 @@ class GroupedDataSet[T: ClassTag](
   def combineGroup[R: TypeInformation: ClassTag](
                                           fun: (Iterator[T], Collector[R]) => Unit): DataSet[R] = {
     Validate.notNull(fun, "GroupCombine function must not be null.")
-    val combiner = new FlatCombineFunction[T, R] {
+    val combiner = new GroupCombineFunction[T, R] {
       val cleanFun = set.clean(fun)
       def combine(in: java.lang.Iterable[T], out: Collector[R]) {
         cleanFun(in.iterator().asScala, out)
@@ -396,7 +396,7 @@ class GroupedDataSet[T: ClassTag](
    *  arbitrary output type.
    */
   def combineGroup[R: TypeInformation: ClassTag](
-      combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+      combiner: GroupCombineFunction[T, R]): DataSet[R] = {
     Validate.notNull(combiner, "GroupCombine function must not be null.")
     wrap(
       new GroupCombineOperator[T, R](maybeCreateSortedGrouping(),

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 6631f07..e2a160d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -21,14 +21,14 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
 import org.apache.flink.util.Collector;
 
 
 public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
-		FlatCombineFunction<VertexWithRank, VertexWithRank>
+		GroupCombineFunction<VertexWithRank, VertexWithRank>
 {
 	private static final long serialVersionUID = 1L;