You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/18 18:17:25 UTC

[1/2] flink git commit: [FLINK-1622][java-api][scala-api] add a GroupCombine operator

Repository: flink
Updated Branches:
  refs/heads/master 4a49a73a7 -> e93e0cb86


http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
index f413b81..2216217 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -86,7 +86,7 @@ public class ReduceWrappingFunctionTest {
 			target.clear();
 			
 			// test combine
-			((FlatCombineFunction<Record>) reducer).combine(source, collector);
+			((FlatCombineFunction<Record, Record>) reducer).combine(source, collector);
 			assertEquals(2, target.size());
 			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
 			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
@@ -138,7 +138,7 @@ public class ReduceWrappingFunctionTest {
 			target.clear();
 			
 			// test combine
-			((FlatCombineFunction<Record>) reducer).combine(source, collector);
+			((FlatCombineFunction<Record, Record>) reducer).combine(source, collector);
 			assertEquals(2, target.size());
 			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
 			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
new file mode 100644
index 0000000..7d87a6b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -0,0 +1,127 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* Non-chained driver for the partial group reduce operator that acts like a combiner with a custom output type OUT.
+* Like @org.apache.flink.runtime.operators.GroupCombineDriver but without grouping and sorting. May emit partially
+* reduced results.
+*
+* @see org.apache.flink.api.common.functions.FlatCombineFunction
+*/
+public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class);
+
+	private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+
+	private boolean objectReuseEnabled = false;
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) {
+		this.taskContext = context;
+	}
+
+	@Override
+	public int getNumberOfInputs() {
+		return 1;
+	}
+
+	@Override
+	public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+		@SuppressWarnings("unchecked")
+		final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+		return clazz;
+	}
+
+	@Override
+	public int getNumberOfDriverComparators() {
+		return 0;
+	}
+
+	@Override
+	public void prepare() throws Exception {
+		final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
+		if(driverStrategy != DriverStrategy.ALL_GROUP_COMBINE){
+			throw new Exception("Invalid strategy " + driverStrategy + " for " +
+					"GroupCombine.");
+		}
+
+		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
+		this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("GroupCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+		}
+	}
+
+	@Override
+	public void run() throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("AllGroupCombine starting.");
+		}
+
+		final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
+		TypeSerializer<IN> serializer = serializerFactory.getSerializer();
+
+		final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
+		final FlatCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
+		final Collector<OUT> output = this.taskContext.getOutputCollector();
+
+		if (objectReuseEnabled) {
+			final ReusingMutableToRegularIteratorWrapper<IN> inIter = new ReusingMutableToRegularIteratorWrapper<IN>(in, serializer);
+
+			if (inIter.hasNext()) {
+					reducer.combine(inIter, output);
+
+			}
+
+		} else {
+			final NonReusingMutableToRegularIteratorWrapper<IN> inIter = new NonReusingMutableToRegularIteratorWrapper<IN>(in, serializer);
+
+			if (inIter.hasNext()) {
+					reducer.combine(inIter, output);
+			}
+		}
+
+	}
+
+	@Override
+	public void cleanup() throws Exception {
+	}
+
+	@Override
+	public void cancel() {
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index 854dbd5..9b9b5f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -32,9 +32,13 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
- * GroupReduce task which is executed by a Nephele task manager. The task has a
+ * GroupReduceDriver task which is executed by a Nephele task manager. The task has a
  * single input and one or multiple outputs. It is provided with a GroupReduceFunction
- * implementation.
+ * implementation or a RichGroupFunction. This Driver performs
+ * multiple tasks depending on the DriverStrategy. In case of a ALL_GROUP_REDUCE_COMBINE
+ * it uses the combine function of the supplied user function. In case
+ * of the ALL_GROUP_REDUCE, it uses the reduce function of the supplied user function to
+ * process all elements. In either case, the function is executed on all elements.
  * <p>
  * The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction.
@@ -85,15 +89,19 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 	public void prepare() throws Exception {
 		final TaskConfig config = this.taskContext.getTaskConfig();
 		this.strategy = config.getDriverStrategy();
-		
-		if (strategy == DriverStrategy.ALL_GROUP_COMBINE) {
-			if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) {
-				throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName());
-			}
-		}
-		else if (strategy != DriverStrategy.ALL_GROUP_REDUCE) {
-			throw new Exception("Unrecognized driver strategy for AllGroupReduce driver: " + config.getDriverStrategy().name());
+
+		switch (this.strategy) {
+			case ALL_GROUP_REDUCE_COMBINE:
+				if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) {
+					throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName());
+				}
+			case ALL_GROUP_REDUCE:
+			case ALL_GROUP_COMBINE:
+				break;
+			default:
+				throw new Exception("Unrecognized driver strategy for AllGroupReduce driver: " + this.strategy.name());
 		}
+
 		this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
 		this.input = this.taskContext.getInput(0);
 
@@ -108,7 +116,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 	@Override
 	public void run() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(this.taskContext.formatLogString("AllGroupReduce preprocessing done. Running Reducer code."));
+			LOG.debug(this.taskContext.formatLogString("AllGroupReduceDriver preprocessing done. Running Reducer code."));
 		}
 
 		if (objectReuseEnabled) {
@@ -120,10 +128,12 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 					final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub();
 					final Collector<OT> output = this.taskContext.getOutputCollector();
 					reducer.reduce(inIter, output);
-				} else {
-					@SuppressWarnings("unchecked") final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub();
-					@SuppressWarnings("unchecked") final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector();
+				} else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) {
+					@SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub();
+					final Collector<OT> output = this.taskContext.getOutputCollector();
 					combiner.combine(inIter, output);
+				} else {
+					throw new Exception("The strategy " + strategy + " is unknown to this driver.");
 				}
 			}
 
@@ -136,10 +146,12 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 					final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub();
 					final Collector<OT> output = this.taskContext.getOutputCollector();
 					reducer.reduce(inIter, output);
-				} else {
-					@SuppressWarnings("unchecked") final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub();
-					@SuppressWarnings("unchecked") final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector();
+				} else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) {
+					@SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub();
+					final Collector<OT> output = this.taskContext.getOutputCollector();
 					combiner.combine(inIter, output);
+				} else {
+					throw new Exception("The strategy " + strategy + " is unknown to this driver.");
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index ae9b474..d5b131e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -55,7 +55,7 @@ public enum DriverStrategy {
 	// group everything together into one group and apply the GroupReduce function
 	ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce's combine function
-	ALL_GROUP_COMBINE(AllGroupReduceDriver.class, null, PIPELINED, 0),
+	ALL_GROUP_REDUCE_COMBINE(AllGroupReduceDriver.class, null, PIPELINED, 0),
 
 	// grouping the inputs and apply the Reduce Function
 	SORTED_REDUCE(ReduceDriver.class, null, PIPELINED, 1),
@@ -67,6 +67,9 @@ public enum DriverStrategy {
 	// partially grouping inputs (best effort resulting possibly in duplicates --> combiner)
 	SORTED_GROUP_COMBINE(GroupReduceCombineDriver.class, SynchronousChainedCombineDriver.class, MATERIALIZING, 2),
 
+	// group combine on all inputs within a partition (without grouping)
+	ALL_GROUP_COMBINE(AllGroupCombineDriver.class, null, PIPELINED, 0),
+
 	// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
 	MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index be0c9c4..dacd436 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -41,36 +41,43 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * Combine operator, standalone (not chained)
- * <p>
+ * Non-chained combine driver which is used for a CombineGroup transformation or a GroupReduce transformation where
+ * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a
+ * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution.
+ * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result.
+ * The CombineGroup uses the FlatCombineFunction interface which allows to combine values of type <IN> to any type
+ * of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and
+ * output type to be able to reduce the elements after the combine from <IN> to <OUT>.
+ *
  * The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.
  * 
- * @param <T> The data type consumed and produced by the combiner.
+ * @param <IN> The data type consumed by the combiner.
+ * @param <OUT> The data type produced by the combiner.
  */
-public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFunction<T>, T> {
+public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class);
 
 	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
 	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 
-	private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
+	private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
 
-	private InMemorySorter<T> sorter;
+	private InMemorySorter<IN> sorter;
 
-	private FlatCombineFunction<T> combiner;
+	private FlatCombineFunction<IN, OUT> combiner;
 
-	private TypeSerializer<T> serializer;
+	private TypeSerializer<IN> serializer;
 
-	private TypeComparator<T> sortingComparator;
+	private TypeComparator<IN> sortingComparator;
 	
-	private TypeComparator<T> groupingComparator;
+	private TypeComparator<IN> groupingComparator;
 
 	private QuickSort sortAlgo = new QuickSort();
 
 	private MemoryManager memManager;
 
-	private Collector<T> output;
+	private Collector<OUT> output;
 
 	private volatile boolean running = true;
 
@@ -79,7 +86,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<FlatCombineFunction<T>, T> context) {
+	public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -90,9 +97,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	}
 	
 	@Override
-	public Class<FlatCombineFunction<T>> getStubType() {
+	public Class<FlatCombineFunction<IN, OUT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<FlatCombineFunction<T>> clazz = (Class<FlatCombineFunction<T>>) (Class<?>) FlatCombineFunction.class;
+		final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
 		return clazz;
 	}
 
@@ -103,15 +110,16 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 	@Override
 	public void prepare() throws Exception {
-		if(this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_GROUP_COMBINE){
-			throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for " +
+		final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
+		if(driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
+			throw new Exception("Invalid strategy " + driverStrategy + " for " +
 					"group reduce combinder.");
 		}
 
 		this.memManager = this.taskContext.getMemoryManager();
 		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
 
-		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
+		final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
 		this.serializer = serializerFactory.getSerializer();
 		this.sortingComparator = this.taskContext.getDriverComparator(0);
 		this.groupingComparator = this.taskContext.getDriverComparator(1);
@@ -125,9 +133,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
 				this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.sortingComparator, memory);
+			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
 		}
 
 		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
@@ -144,10 +152,10 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 			LOG.debug("Combiner starting.");
 		}
 
-		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
-		final TypeSerializer<T> serializer = this.serializer;
+		final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
+		final TypeSerializer<IN> serializer = this.serializer;
 
-		T value = serializer.createInstance();
+		IN value = serializer.createInstance();
 
 		while (running && (value = in.next(value)) != null) {
 
@@ -171,17 +179,17 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	}
 
 	private void sortAndCombine() throws Exception {
-		final InMemorySorter<T> sorter = this.sorter;
+		final InMemorySorter<IN> sorter = this.sorter;
 
 		if (objectReuseEnabled) {
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 
-				final ReusingKeyGroupedIterator<T> keyIter = 
-						new ReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator);
+				final ReusingKeyGroupedIterator<IN> keyIter = 
+						new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
 
-				final FlatCombineFunction<T> combiner = this.combiner;
-				final Collector<T> output = this.output;
+				final FlatCombineFunction<IN, OUT> combiner = this.combiner;
+				final Collector<OUT> output = this.output;
 
 				// iterate over key groups
 				while (this.running && keyIter.nextKey()) {
@@ -192,11 +200,11 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 
-				final NonReusingKeyGroupedIterator<T> keyIter = 
-						new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.groupingComparator);
+				final NonReusingKeyGroupedIterator<IN> keyIter = 
+						new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
 
-				final FlatCombineFunction<T> combiner = this.combiner;
-				final Collector<T> output = this.output;
+				final FlatCombineFunction<IN, OUT> combiner = this.combiner;
+				final Collector<OUT> output = this.output;
 
 				// iterate over key groups
 				while (this.running && keyIter.nextKey()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index ffe09cb..b296506 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.MutableObjectIterator;
 /**
  * A runtime task is the task that is executed by the flink engine inside a task vertex.
  * It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
- * deals with the runtime setup and teardown and the control-flow logic. The later appears especially
+ * deals with the runtime setup and teardown and the control-flow logic. The latter appears especially
  * in the case of iterations.
  *
  * @param <S> The UDF type.

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
new file mode 100644
index 0000000..ff397c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained variant of the GroupCombineDriver
+ * 
+ * Acts like a combiner with a custom output type OUT.
+ *
+ * Sorting and reducing of the elements is performed invididually for each partition without data exchange. This may
+ * lead to a partial group reduce.
+ *  
+ * @param <IN> The data type consumed
+ * @param <OUT> The data type produced
+ */
+public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(GroupCombineChainedDriver.class);
+
+	/**
+	 * Fix length records with a length below this threshold will be in-place sorted, if possible.
+	 */
+	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+	// --------------------------------------------------------------------------------------------
+
+	private InMemorySorter<IN> sorter;
+
+	private GroupReduceFunction<IN, OUT> reducer;
+
+	private TypeSerializer<IN> serializer;
+
+	private TypeComparator<IN> sortingComparator;
+
+	private TypeComparator<IN> groupingComparator;
+
+	private AbstractInvokable parent;
+
+	private QuickSort sortAlgo = new QuickSort();
+
+	private MemoryManager memManager;
+
+	private volatile boolean running = true;
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void setup(AbstractInvokable parent) {
+		this.parent = parent;
+
+		@SuppressWarnings("unchecked")
+		final GroupReduceFunction<IN, OUT> combiner =
+			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupReduceFunction.class);
+		this.reducer = combiner;
+		FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
+	}
+
+	@Override
+	public void openTask() throws Exception {
+		// open the stub first
+		final Configuration stubConfig = this.config.getStubParameters();
+		RegularPactTask.openUserCode(this.reducer, stubConfig);
+
+		// ----------------- Set up the asynchronous sorter -------------------------
+
+		this.memManager = this.parent.getEnvironment().getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+
+		// instantiate the serializer / comparator
+		final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
+		final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
+		final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
+		this.serializer = serializerFactory.getSerializer();
+		this.sortingComparator = sortingComparatorFactory.createComparator();
+		this.groupingComparator = groupingComparatorFactory.createComparator();
+
+		final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
+
+		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
+		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+			this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
+		{
+			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+		} else {
+			this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("SynchronousChainedCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+		}
+	}
+
+	@Override
+	public void closeTask() throws Exception {
+		this.memManager.release(this.sorter.dispose());
+
+		if (!this.running) {
+			return;
+		}
+
+		RegularPactTask.closeUserCode(this.reducer);
+	}
+
+	@Override
+	public void cancelTask() {
+		this.running = false;
+		this.memManager.release(this.sorter.dispose());
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public Function getStub() {
+		return this.reducer;
+	}
+
+	public String getTaskName() {
+		return this.taskName;
+	}
+
+	@Override
+	public void collect(IN record) {
+		// try writing to the sorter first
+		try {
+			if (this.sorter.write(record)) {
+				return;
+			}
+		} catch (IOException e) {
+			throw new ExceptionInChainedStubException(this.taskName, e);
+		}
+
+		// do the actual sorting
+		try {
+			sortAndReduce();
+		} catch (Exception e) {
+			throw new ExceptionInChainedStubException(this.taskName, e);
+		}
+		this.sorter.reset();
+
+		try {
+			if (!this.sorter.write(record)) {
+				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+			}
+		} catch (IOException e) {
+			throw new ExceptionInChainedStubException(this.taskName, e);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void close() {
+		try {
+			sortAndReduce();
+		} catch (Exception e) {
+			throw new ExceptionInChainedStubException(this.taskName, e);
+		}
+
+		this.outputCollector.close();
+	}
+
+	private void sortAndReduce() throws Exception {
+		final InMemorySorter<IN> sorter = this.sorter;
+
+		if (objectReuseEnabled) {
+			if (!sorter.isEmpty()) {
+				this.sortAlgo.sort(sorter);
+				// run the reducer
+				final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
+
+
+				// cache references on the stack
+				final GroupReduceFunction<IN, OUT> stub = this.reducer;
+				final Collector<OUT> output = this.outputCollector;
+
+				// run stub implementation
+				while (this.running && keyIter.nextKey()) {
+					stub.reduce(keyIter.getValues(), output);
+				}
+			}
+		} else {
+			if (!sorter.isEmpty()) {
+				this.sortAlgo.sort(sorter);
+				// run the reducer
+				final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
+
+
+				// cache references on the stack
+				final GroupReduceFunction<IN, OUT> stub = this.reducer;
+				final Collector<OUT> output = this.outputCollector;
+
+				// run stub implementation
+				while (this.running && keyIter.nextKey()) {
+					stub.reduce(keyIter.getValues(), output);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index dde6fe6..7e36b49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -43,7 +43,17 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
+/**
+ * The chained variant of the combine driver which is also implemented in GroupReduceCombineDriver. In contrast to the
+ * GroupReduceCombineDriver, this driver's purpose is only to combine the values received in the chain. It is used by
+ * the GroupReduce and the CombineGroup transformation.
+ *
+ * @see org.apache.flink.runtime.operators.GroupReduceCombineDriver
+ * @param <IN> The data type consumed by the combiner.
+ * @param <OUT> The data type produced by the combiner.
+ */
+
+public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SynchronousChainedCombineDriver.class);
 
@@ -55,15 +65,15 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 
 	// --------------------------------------------------------------------------------------------
 
-	private InMemorySorter<T> sorter;
+	private InMemorySorter<IN> sorter;
 
-	private FlatCombineFunction<T> combiner;
+	private FlatCombineFunction<IN, OUT> combiner;
 
-	private TypeSerializer<T> serializer;
+	private TypeSerializer<IN> serializer;
 
-	private TypeComparator<T> sortingComparator;
+	private TypeComparator<IN> sortingComparator;
 
-	private TypeComparator<T> groupingComparator;
+	private TypeComparator<IN> groupingComparator;
 
 	private AbstractInvokable parent;
 
@@ -80,7 +90,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 		this.parent = parent;
 
 		@SuppressWarnings("unchecked")
-		final FlatCombineFunction<T> combiner =
+		final FlatCombineFunction<IN, OUT> combiner =
 			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class);
 		this.combiner = combiner;
 		FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
@@ -98,9 +108,9 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
 
 		// instantiate the serializer / comparator
-		final TypeSerializerFactory<T> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
-		final TypeComparatorFactory<T> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
-		final TypeComparatorFactory<T> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
+		final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
+		final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
+		final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
 		this.serializer = serializerFactory.getSerializer();
 		this.sortingComparator = sortingComparatorFactory.createComparator();
 		this.groupingComparator = groupingComparatorFactory.createComparator();
@@ -111,9 +121,9 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
 			this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.sortingComparator, memory);
+			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -149,7 +159,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 	}
 
 	@Override
-	public void collect(T record) {
+	public void collect(IN record) {
 		// try writing to the sorter first
 		try {
 			if (this.sorter.write(record)) {
@@ -190,18 +200,18 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 	}
 
 	private void sortAndCombine() throws Exception {
-		final InMemorySorter<T> sorter = this.sorter;
+		final InMemorySorter<IN> sorter = this.sorter;
 
 		if (objectReuseEnabled) {
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 				// run the combiner
-				final ReusingKeyGroupedIterator<T> keyIter = new ReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator);
+				final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
 
 
 				// cache references on the stack
-				final FlatCombineFunction<T> stub = this.combiner;
-				final Collector<T> output = this.outputCollector;
+				final FlatCombineFunction<IN, OUT> stub = this.combiner;
+				final Collector<OUT> output = this.outputCollector;
 
 				// run stub implementation
 				while (this.running && keyIter.nextKey()) {
@@ -212,12 +222,12 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 				// run the combiner
-				final NonReusingKeyGroupedIterator<T> keyIter = new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.groupingComparator);
+				final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
 
 
 				// cache references on the stack
-				final FlatCombineFunction<T> stub = this.combiner;
-				final Collector<T> output = this.outputCollector;
+				final FlatCombineFunction<IN, OUT> stub = this.combiner;
+				final Collector<OUT> output = this.outputCollector;
 
 				// run stub implementation
 				while (this.running && keyIter.nextKey()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index d05bd9a..9282fd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -70,7 +70,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 */
 	private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
 
-	private final FlatCombineFunction<E> combineStub;	// the user code stub that does the combining
+	private final FlatCombineFunction<E, E> combineStub;	// the user code stub that does the combining
 	
 	private Configuration udfConfig;
 	
@@ -100,7 +100,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
 	 */
-	public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+	public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
@@ -132,7 +132,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
 	 */
-	public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+	public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
@@ -253,7 +253,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			
 			// ------------------- Spilling Phase ------------------------
 			
-			final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+			final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
 			
 			// now that we are actually spilling, take the combiner, and open it
 			try {
@@ -463,7 +463,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 																			this.memManager.getPageSize());
 			
 			final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
-			final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+			final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
 
 			// combine and write to disk
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 5e2f3ca..d957fa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -68,7 +68,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
 		
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);
@@ -122,7 +122,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
 
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 515ce76..3d9e991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -70,7 +70,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
 		
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);
@@ -107,7 +107,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
 		
 		try {
 			testDriver(testTask, MockFailingCombiningReduceStub.class);
@@ -132,7 +132,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 61bfbfa..00761ec 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -627,6 +627,62 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+   *  Applies a CombineFunction on a grouped [[DataSet]].  A
+   *  CombineFunction is similar to a GroupReduceFunction but does not
+   *  perform a full data exchange. Instead, the CombineFunction calls
+   *  the combine method once per partition for combining a group of
+   *  results. This operator is suitable for combining values into an
+   *  intermediate format before doing a proper groupReduce where the
+   *  data is shuffled across the node for further reduction. The
+   *  GroupReduce operator can also be supplied with a combiner by
+   *  implementing the RichGroupReduce function. The combine method of
+   *  the RichGroupReduce function demands input and output type to be
+   *  the same. The CombineFunction, on the other side, can have an
+   *  arbitrary output type.
+   */
+  def combineGroup[R: TypeInformation: ClassTag](
+      combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+    if (combiner == null) {
+      throw new NullPointerException("Combine function must not be null.")
+    }
+    wrap(new GroupCombineOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      combiner,
+      getCallLocationName()))
+  }
+
+  /**
+   *  Applies a CombineFunction on a grouped [[DataSet]].  A
+   *  CombineFunction is similar to a GroupReduceFunction but does not
+   *  perform a full data exchange. Instead, the CombineFunction calls
+   *  the combine method once per partition for combining a group of
+   *  results. This operator is suitable for combining values into an
+   *  intermediate format before doing a proper groupReduce where the
+   *  data is shuffled across the node for further reduction. The
+   *  GroupReduce operator can also be supplied with a combiner by
+   *  implementing the RichGroupReduce function. The combine method of
+   *  the RichGroupReduce function demands input and output type to be
+   *  the same. The CombineFunction, on the other side, can have an
+   *  arbitrary output type.
+   */
+  def combineGroup[R: TypeInformation: ClassTag](
+      fun: (Iterator[T], Collector[R]) => Unit): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Combine function must not be null.")
+    }
+    val combiner = new FlatCombineFunction[T, R] {
+      val cleanFun = clean(fun)
+      def combine(in: java.lang.Iterable[T], out: Collector[R]) {
+        cleanFun(in.iterator().asScala, out)
+      }
+    }
+    wrap(new GroupCombineOperator[T, R](javaSet,
+      implicitly[TypeInformation[R]],
+      combiner,
+      getCallLocationName()))
+  }
+
+  /**
    * Creates a new DataSet containing the first `n` elements of this DataSet.
    */
   def first(n: Int): DataSet[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index 7ac8dcd..eca4563 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.functions.{KeySelector, FirstReducer}
 import org.apache.flink.api.scala.operators.ScalaAggregateOperator
 import scala.collection.JavaConverters._
 import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.functions.{GroupReduceFunction, ReduceFunction}
+import org.apache.flink.api.common.functions.{FlatCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.operators._
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.Collector
 import scala.collection.mutable
 import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.Partitioner
 import com.google.common.base.Preconditions
 
 /**
@@ -355,6 +354,56 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+   *  Applies a CombineFunction on a grouped [[DataSet]].  A
+   *  CombineFunction is similar to a GroupReduceFunction but does not
+   *  perform a full data exchange. Instead, the CombineFunction calls
+   *  the combine method once per partition for combining a group of
+   *  results. This operator is suitable for combining values into an
+   *  intermediate format before doing a proper groupReduce where the
+   *  data is shuffled across the node for further reduction. The
+   *  GroupReduce operator can also be supplied with a combiner by
+   *  implementing the RichGroupReduce function. The combine method of
+   *  the RichGroupReduce function demands input and output type to be
+   *  the same. The CombineFunction, on the other side, can have an
+   *  arbitrary output type.
+   */
+  def combineGroup[R: TypeInformation: ClassTag](
+                                          fun: (Iterator[T], Collector[R]) => Unit): DataSet[R] = {
+    Validate.notNull(fun, "GroupCombine function must not be null.")
+    val combiner = new FlatCombineFunction[T, R] {
+      val cleanFun = set.clean(fun)
+      def combine(in: java.lang.Iterable[T], out: Collector[R]) {
+        cleanFun(in.iterator().asScala, out)
+      }
+    }
+    wrap(
+      new GroupCombineOperator[T, R](maybeCreateSortedGrouping(),
+        implicitly[TypeInformation[R]], combiner, getCallLocationName()))
+  }
+
+  /**
+   *  Applies a CombineFunction on a grouped [[DataSet]].  A
+   *  CombineFunction is similar to a GroupReduceFunction but does not
+   *  perform a full data exchange. Instead, the CombineFunction calls
+   *  the combine method once per partition for combining a group of
+   *  results. This operator is suitable for combining values into an
+   *  intermediate format before doing a proper groupReduce where the
+   *  data is shuffled across the node for further reduction. The
+   *  GroupReduce operator can also be supplied with a combiner by
+   *  implementing the RichGroupReduce function. The combine method of
+   *  the RichGroupReduce function demands input and output type to be
+   *  the same. The CombineFunction, on the other side, can have an
+   *  arbitrary output type.
+   */
+  def combineGroup[R: TypeInformation: ClassTag](
+      combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+    Validate.notNull(combiner, "GroupCombine function must not be null.")
+    wrap(
+      new GroupCombineOperator[T, R](maybeCreateSortedGrouping(),
+        implicitly[TypeInformation[R]], combiner, getCallLocationName()))
+  }
+
+  /**
    * Creates a new DataSet containing the first `n` elements of each group of this DataSet.
    */
   def first(n: Int): DataSet[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 8de861a..6631f07 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
 
 
 public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
-		FlatCombineFunction<VertexWithRank>
+		FlatCombineFunction<VertexWithRank, VertexWithRank>
 {
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
new file mode 100644
index 0000000..0808389
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
@@ -0,0 +1,522 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+/**
+* The GroupCombine operator is not easy to test because it is essentially just a combiner. The result can be
+* the result of a normal groupReduce at any stage its execution. The basic idea is to preserve the grouping key
+* in the partial result, so that we can do a reduceGroup afterwards to finalize the results for verification.
+* In addition, we can use hashPartition to partition the data and check if no shuffling (just combining) has
+* been performed.
+*/
+public class GroupCombineITCase extends MultipleProgramsTestBase {
+
+	public GroupCombineITCase(ExecutionMode mode) {
+		super(mode);
+	}
+
+	private String resultPath;
+
+	private String expected;
+
+	private static String identityResult = "1,1,Hi\n" +
+			"2,2,Hello\n" +
+			"3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" +
+			"5,3,I am fine.\n" +
+			"6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" +
+			"8,4,Comment#2\n" +
+			"9,4,Comment#3\n" +
+			"10,4,Comment#4\n" +
+			"11,5,Comment#5\n" +
+			"12,5,Comment#6\n" +
+			"13,5,Comment#7\n" +
+			"14,5,Comment#8\n" +
+			"15,5,Comment#9\n" +
+			"16,6,Comment#10\n" +
+			"17,6,Comment#11\n" +
+			"18,6,Comment#12\n" +
+			"19,6,Comment#13\n" +
+			"20,6,Comment#14\n" +
+			"21,6,Comment#15\n";
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception {
+		if (expected != null) {
+			compareResultsByLinesInMemory(expected, resultPath);
+		}
+	}
+
+	@Test
+	public void testAllGroupCombineIdentity() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+				// combine
+				.combineGroup(new IdentityFunction())
+				// fully reduce
+				.reduceGroup(new IdentityFunction());
+
+
+		reduceDs.writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = identityResult;
+	}
+
+	@Test
+	public void testIdentity() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+				// combine
+				.combineGroup(new IdentityFunction())
+				// fully reduce
+				.reduceGroup(new IdentityFunction());
+
+		reduceDs.writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = identityResult;
+	}
+
+	@Test
+	public void testIdentityWithGroupBy() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+				.groupBy(1)
+				// combine
+				.combineGroup(new IdentityFunction())
+				// fully reduce
+				.reduceGroup(new IdentityFunction());
+
+
+		reduceDs.writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = identityResult;
+	}
+
+	@Test
+	public void testIdentityWithGroupByAndSort() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+				.groupBy(1)
+				.sortGroup(1, Order.DESCENDING)
+				// reduce partially
+				.combineGroup(new IdentityFunction())
+				.groupBy(1)
+				.sortGroup(1, Order.DESCENDING)
+				// fully reduce
+				.reduceGroup(new IdentityFunction());
+
+		reduceDs.writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = identityResult;
+	}
+
+	@Test
+	public void testPartialReduceWithIdenticalInputOutputType() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// data
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped = ds
+				// wrap values as Kv pairs with the grouping key as key
+				.map(new Tuple3KvWrapper());
+
+		dsWrapped
+				.groupBy(0)
+				// reduce partially
+				.combineGroup(new Tuple3toTuple3GroupReduce())
+				.groupBy(0)
+				// reduce fully to check result
+				.reduceGroup(new Tuple3toTuple3GroupReduce())
+				//unwrap
+				.map(new MapFunction<Tuple2<Long, Tuple3<Integer, Long, String>>, Tuple3<Integer, Long, String>>() {
+					@Override
+					public Tuple3<Integer, Long, String> map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.writeAsCsv(resultPath);
+
+
+
+		env.execute();
+
+		expected = "1,1,combined\n" +
+				"5,4,combined\n" +
+				"15,9,combined\n" +
+				"34,16,combined\n" +
+				"65,25,combined\n" +
+				"111,36,combined\n";
+	}
+
+	@Test
+	public void testPartialReduceWithDifferentInputOutputType() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// data
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped = ds
+				// wrap values as Kv pairs with the grouping key as key
+				.map(new Tuple3KvWrapper());
+
+		dsWrapped
+				.groupBy(0)
+						// reduce partially
+				.combineGroup(new Tuple3toTuple2GroupReduce())
+				.groupBy(0)
+						// reduce fully to check result
+				.reduceGroup(new Tuple2toTuple2GroupReduce())
+						//unwrap
+				.map(new MapFunction<Tuple2<Long,Tuple2<Integer,Long>>, Tuple2<Integer,Long>>() {
+					@Override
+					public Tuple2<Integer, Long> map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.writeAsCsv(resultPath);
+
+
+
+		env.execute();
+
+		expected = "1,3\n" +
+				"5,20\n" +
+				"15,58\n" +
+				"34,52\n" +
+				"65,70\n" +
+				"111,96\n";
+
+	}
+
+	@Test
+	// check if no shuffle is being executed
+	public void testCheckPartitionShuffleGroupBy() throws Exception {
+
+		org.junit.Assume.assumeTrue(mode != ExecutionMode.COLLECTION);
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// data
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		// partition and group data
+		UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1);
+
+		partitionedDS.combineGroup(new FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
+			@Override
+			public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
+				int count = 0;
+				long key = 0;
+				for (Tuple3<Integer, Long, String> value : values) {
+					key = value.f1;
+					count++;
+				}
+				out.collect(new Tuple2(key, count));
+			}
+		}).writeAsCsv(resultPath);
+
+		env.execute();
+
+		String notExpected = "6,6\n" +
+							"5,5\n" +
+							"4,4\n" +
+							"3,3\n" +
+							"2,2\n" +
+							"1,1\n";
+
+		// check
+
+		ArrayList<String> list = new ArrayList<String>();
+		readAllResultLines(list, resultPath);
+
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
+
+		String[] expected = notExpected.split("\n");
+		Arrays.sort(expected);
+
+		Assert.assertEquals("The two arrays were identical.", false, Arrays.equals(expected, result));
+	}
+
+	@Test
+	// check if dop 1 results in the same data like a shuffle
+	public void testCheckPartitionShuffleDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		env.setDegreeOfParallelism(1);
+
+		// data
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		// partition and group data
+		UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1);
+
+		partitionedDS.combineGroup(new FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
+			@Override
+			public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
+				int count = 0;
+				long key = 0;
+				for (Tuple3<Integer, Long, String> value : values) {
+					key = value.f1;
+					count++;
+				}
+				out.collect(new Tuple2(key, count));
+			}
+		}).writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = "6,6\n" +
+				"5,5\n" +
+				"4,4\n" +
+				"3,3\n" +
+				"2,2\n" +
+				"1,1\n";
+
+	}
+
+	@Test
+	// check if all API methods are callable
+	public void testAPI() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple1<String>> ds = CollectionDataSets.getStringDataSet(env).map(new MapFunction<String, Tuple1<String>>() {
+			@Override
+			public Tuple1<String> map(String value) throws Exception {
+				return new Tuple1<String>(value);
+			}
+		});
+
+		// all methods on DataSet
+		ds.combineGroup(new FlatCombineFunctionExample())
+				.output(new DiscardingOutputFormat<Tuple1<String>>());
+
+		// all methods on UnsortedGrouping
+		ds.groupBy(0).combineGroup(new FlatCombineFunctionExample())
+				.output(new DiscardingOutputFormat<Tuple1<String>>());
+
+		// all methods on SortedGrouping
+		ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new FlatCombineFunctionExample())
+				.output(new DiscardingOutputFormat<Tuple1<String>>());
+
+		env.execute();
+	}
+
+	public static class FlatCombineFunctionExample implements FlatCombineFunction<Tuple1<String>, Tuple1<String>> {
+
+		@Override
+		public void combine(Iterable<Tuple1<String>> values, Collector<Tuple1<String>> out) throws Exception {
+			for (Tuple1<String> value : values) {
+				out.collect(value);
+			}
+		}
+	}
+
+	public static class ScalaFlatCombineFunctionExample implements FlatCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> {
+
+		@Override
+		public void combine(Iterable<scala.Tuple1<String>> values, Collector<scala.Tuple1<String>> out) throws Exception {
+			for (scala.Tuple1<String> value : values) {
+				out.collect(value);
+			}
+		}
+	}
+
+	public static class IdentityFunction implements FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>,
+													GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+		@Override
+		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			for (Tuple3<Integer, Long, String> value : values) {
+				out.collect(new Tuple3<Integer, Long, String>(value.f0, value.f1, value.f2));
+			}
+		}
+
+		@Override
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			for (Tuple3<Integer, Long, String> value : values) {
+				out.collect(new Tuple3<Integer, Long, String>(value.f0, value.f1, value.f2));
+			}
+		}
+	}
+
+
+	public static class Tuple3toTuple3GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+		public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) throws Exception {
+			int i = 0;
+			long l = 0;
+			long key = 0;
+
+			// collapse groups
+			for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) {
+				key = value.f0;
+				Tuple3<Integer, Long, String> extracted = value.f1;
+				i += extracted.f0;
+				l += extracted.f1;
+			}
+
+			Tuple3<Integer, Long, String> result = new Tuple3<Integer, Long, String>(i, l, "combined");
+			out.collect(new Tuple2<Long, Tuple3<Integer, Long, String>>(key, result));
+		}
+
+		@Override
+		public void reduce(Iterable values, Collector out) throws Exception {
+			combine(values, out);
+		}
+	}
+
+	public static class Tuple3toTuple2GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+		@Override
+		public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
+			int i = 0;
+			long l = 0;
+			long key = 0;
+
+			// collapse groups
+			for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) {
+				key = value.f0;
+				Tuple3<Integer, Long, String> extracted = value.f1;
+				i += extracted.f0;
+				l += extracted.f1 + extracted.f2.length();
+			}
+
+			Tuple2<Integer, Long> result = new Tuple2<Integer, Long>(i, l);
+			out.collect(new Tuple2<Long, Tuple2<Integer, Long>>(key, result));
+		}
+
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
+			new Tuple2toTuple2GroupReduce().reduce(values, out);
+		}
+	}
+
+	public static class Tuple2toTuple2GroupReduce implements KvGroupReduce<Long, Tuple2<Integer, Long>, Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+		public void combine(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
+			int i = 0;
+			long l = 0;
+			long key = 0;
+
+			// collapse groups
+			for (Tuple2<Long, Tuple2<Integer, Long>> value : values) {
+				key = value.f0;
+				Tuple2<Integer, Long> extracted = value.f1;
+				i += extracted.f0;
+				l += extracted.f1;
+			}
+
+			Tuple2<Integer, Long> result = new Tuple2<Integer, Long>(i, l);
+
+			out.collect(new Tuple2<Long, Tuple2<Integer, Long>>(key, result));
+		}
+
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
+			combine(values, out);
+		}
+	}
+
+	public class Tuple3KvWrapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Tuple3<Integer, Long, String>>> {
+		@Override
+		public Tuple2<Long, Tuple3<Integer, Long, String>> map(Tuple3<Integer, Long, String> value) throws Exception {
+			return new Tuple2<Long,Tuple3<Integer, Long, String>>(value.f1, value);
+		}
+	}
+
+
+	public interface CombineAndReduceGroup <IN, INT, OUT> extends FlatCombineFunction<IN, INT>, GroupReduceFunction<INT, OUT> {
+	}
+
+	public interface KvGroupReduce<K, V, INT, OUT> extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, Tuple2<K, OUT>> {
+	}
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
new file mode 100644
index 0000000..05346ba
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.test.javaApiOperators.GroupCombineITCase
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.util.Collector
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.apache.flink.api.scala._
+
+/**
+ * Java interoperability tests. Main tests are in GroupCombineITCase Java.
+ */
+@RunWith(classOf[Parameterized])
+class GroupCombineITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testApi(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds: DataSet[Tuple1[String]] = CollectionDataSets.getStringDataSet(env)
+      .map(str => Tuple1(str))
+
+    // all methods on DataSet
+    ds.combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample())
+      .output(new DiscardingOutputFormat[Tuple1[String]])
+
+    ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+      .output(new DiscardingOutputFormat[Tuple1[String]])
+
+    // all methods on UnsortedGrouping
+    ds.groupBy(0)
+      .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample())
+      .output(new DiscardingOutputFormat[Tuple1[String]])
+
+    ds.groupBy(0)
+      .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+      .output(new DiscardingOutputFormat[Tuple1[String]])
+
+    // all methods on SortedGrouping
+    ds.groupBy(0).sortGroup(0, Order.ASCENDING)
+      .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample())
+      .output(new DiscardingOutputFormat[Tuple1[String]])
+
+    ds.groupBy(0).sortGroup(0, Order.ASCENDING)
+      .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+      .output(new DiscardingOutputFormat[Tuple1[String]])
+
+    env.execute
+  }
+
+}
+
+


[2/2] flink git commit: [FLINK-1622][java-api][scala-api] add a GroupCombine operator

Posted by mx...@apache.org.
[FLINK-1622][java-api][scala-api] add a GroupCombine operator

The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test

This closes #466


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e93e0cb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e93e0cb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e93e0cb8

Branch: refs/heads/master
Commit: e93e0cb868087a8ab707c7e96cfccf220b07a4aa
Parents: 4a49a73
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 3 17:27:13 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 18 18:16:05 2015 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 | 115 +++-
 .../org/apache/flink/compiler/PactCompiler.java |  48 +-
 .../flink/compiler/costs/CostEstimator.java     |   5 +-
 .../flink/compiler/dag/GroupCombineNode.java    | 106 ++++
 .../operators/AllGroupCombineProperties.java    |  73 +++
 .../AllGroupWithPartialPreGroupProperties.java  |   2 +-
 .../operators/GroupCombineProperties.java       | 117 +++++
 .../operators/OperatorDescriptorSingle.java     |   5 +-
 .../operators/PartialGroupProperties.java       |  91 ----
 .../plandump/PlanJSONDumpGenerator.java         |   2 +-
 .../compiler/postpass/JavaApiPostPass.java      |   2 +-
 .../java/GroupReduceCompilationTest.java        |   2 +-
 .../api/common/functions/CombineFunction.java   |   7 +-
 .../common/functions/FlatCombineFunction.java   |   7 +-
 .../functions/RichFlatCombineFunction.java      |   7 +-
 .../functions/RichGroupReduceFunction.java      |   2 +-
 .../base/GroupCombineOperatorBase.java          | 151 ++++++
 .../java/org/apache/flink/api/java/DataSet.java |  26 +-
 .../flink/api/java/functions/FirstReducer.java  |   2 +-
 .../java/operators/GroupCombineOperator.java    | 227 ++++++++
 .../api/java/operators/SortedGrouping.java      |  22 +
 .../api/java/operators/UnsortedGrouping.java    |  24 +-
 .../PlanUnwrappingGroupCombineOperator.java     |  70 +++
 .../PlanUnwrappingReduceGroupOperator.java      |   2 +-
 ...lanUnwrappingSortedGroupCombineOperator.java |  70 +++
 ...PlanUnwrappingSortedReduceGroupOperator.java |   2 +-
 .../java/record/operators/ReduceOperator.java   |   2 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  11 +
 .../java/record/ReduceWrappingFunctionTest.java |   4 +-
 .../operators/AllGroupCombineDriver.java        | 127 +++++
 .../runtime/operators/AllGroupReduceDriver.java |  46 +-
 .../flink/runtime/operators/DriverStrategy.java |   5 +-
 .../operators/GroupReduceCombineDriver.java     |  70 +--
 .../runtime/operators/PactTaskContext.java      |   2 +-
 .../chaining/GroupCombineChainedDriver.java     | 239 +++++++++
 .../SynchronousChainedCombineDriver.java        |  50 +-
 .../sort/CombiningUnilateralSortMerger.java     |  10 +-
 .../operators/CombineTaskExternalITCase.java    |   4 +-
 .../runtime/operators/CombineTaskTest.java      |   6 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  56 ++
 .../apache/flink/api/scala/GroupedDataSet.scala |  53 +-
 .../CustomRankCombiner.java                     |   2 +-
 .../javaApiOperators/GroupCombineITCase.java    | 522 +++++++++++++++++++
 .../scala/operators/GroupCombineITCase.scala    |  77 +++
 44 files changed, 2248 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 13082c1..2bec61b 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -260,7 +260,7 @@ class WC(val word: String, val count: Int) {
 }
 
 val words: DataSet[WC] = // [...]
-val wordCounts = words.groupBy { _.word } reduce { 
+val wordCounts = words.groupBy { _.word } reduce {
   (w1, w2) => new WC(w1.word, w1.count + w2.count)
 }
 ~~~
@@ -298,7 +298,7 @@ val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
 
 #### Reduce on DataSet grouped by Case Class Fields
 
-When using Case Classes you can also specify the grouping key using the names of the fields: 
+When using Case Classes you can also specify the grouping key using the names of the fields:
 
 ~~~scala
 case class MyClass(val a: String, b: Int, c: Double)
@@ -334,7 +334,7 @@ public class DistinctReduce
 
     Set<String> uniqStrings = new HashSet<String>();
     Integer key = null;
-  
+
     // add all strings of the group to the set
     for (Tuple2<Integer, String> t : in) {
       key = t.f0;
@@ -524,6 +524,99 @@ class MyCombinableGroupReducer
 </div>
 </div>
 
+### GroupCombine on a Grouped DataSet
+
+The GroupCombine transformation is the generalized form of the combine step in
+the Combinable GroupReduceFunction. It is generalized in the sense that it
+allows combining of input type `I` to an arbitrary output type `O`. In contrast,
+the combine step in the GroupReduce only allows combining from input type `I` to
+output type `I`. This is because the reduce step in the GroupReduceFunction
+expects input type `I`.
+
+In some applications, it is desirable to combine a DataSet into an intermediate
+format before performing additional transformations (e.g. to reduce data
+size). This can be achieved with a ComineGroup transformation with very little
+costs.
+
+**Note:** The GroupCombine on a Grouped DataSet is performed in memory with a
+  greedy strategy which may not process all data at once but in multiple
+  steps. It is also performed on the individual partitions without a data
+  exchange like in a GroupReduce transformation. This may lead to partial
+  results.
+
+The following example demonstrates the use of a CombineGroup transformation for
+an alternative WordCount implementation. In the implementation,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<String> input = [..] // The words received as input
+DataSet<String> groupedInput = input.groupBy(0); // group identical words
+
+DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new FlatCombineFunction<String, Tuple2<String, Integer>() {
+
+    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
+        int count = 0;
+        for (String word : words) {
+            count++;
+        }
+        out.collect(new Tuple2(word, count));
+    }
+});
+
+DataSet<Tuple2<String, Integer>> groupedCombinedWords = combinedWords.groupBy(0); // group by words again
+
+DataSet<Tuple2<String, Integer>> output = combinedWords.groupReduce(new GroupReduceFunction() { // group reduce with full data exchange
+
+    public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
+        int count = 0;
+        for (Tuple2<String, Integer> word : words) {
+            count++;
+        }
+        out.collect(new Tuple2(word, count));
+    }
+});
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[String] = [..] // The words received as input
+val groupedInput: DataSet[String] = input.groupBy(0)
+
+val combinedWords: DataSet[(String, Int)] = groupedInput.groupCombine {
+    (words, out: Collector[(String, Int)]) =>
+        var count = 0
+        for (word <- words) {
+            count++
+        }
+        out.collect(word, count)
+}
+
+val groupedCombinedWords: DataSet[(String, Int)] = combinedWords.groupBy(0)
+
+val output: DataSet[(String, Int)] = groupedInput.groupCombine {
+    (words, out: Collector[(String, Int)]) =>
+        var count = 0
+        for ((word, Int) <- words) {
+            count++
+        }
+        out.collect(word, count)
+}
+
+~~~
+
+</div>
+</div>
+
+The above alternative WordCount implementation demonstrates how the GroupCombine
+combines words before performing the GroupReduce transformation. The above
+example is just a proof of concept. Note, how the combine step changes the type
+of the DataSet which would normally required an additional Map transformation
+before executing the GroupReduce.
+
 ### Aggregate on Grouped Tuple DataSet
 
 There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
@@ -558,7 +651,7 @@ val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
 </div>
 </div>
 
-To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet. 
+To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
 In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
 
 **Note:** The set of aggregation functions will be extended in the future.
@@ -632,6 +725,12 @@ group-reduce function is not combinable. Therefore, this can be a very compute i
 See the paragraph on "Combineable Group-Reduce Functions" above to learn how to implement a
 combinable group-reduce function.
 
+### GroupCombine on a full DataSet
+
+The GroupCombine on a full DataSet works similar to the GroupCombine on a
+grouped DataSet. The data is partitioned on all nodes and then combined in a
+greedy fashion (i.e. only data fitting into memory is combined at once).
+
 ### Aggregate on full Tuple DataSet
 
 There are some common aggregation operations that are frequently used. The Aggregate transformation
@@ -898,7 +997,7 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
 DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
-DataSet<Tuple2<SomeType, AnotherType> result = 
+DataSet<Tuple2<SomeType, AnotherType> result =
       input1.join(input2, BROADCAST_HASH_FIRST)
             .where("id").equalTo("key");
 ~~~
@@ -1199,7 +1298,7 @@ val out = in.rebalance().map { ... }
 
 ### Hash-Partition
 
-Hash-partitions a DataSet on a given key. 
+Hash-partitions a DataSet on a given key.
 Keys can be specified as key expressions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
 
 <div class="codetabs" markdown="1">
@@ -1235,7 +1334,7 @@ Partitions can be sorted on multiple fields by chaining `sortPartition()` calls.
 
 ~~~java
 DataSet<Tuple2<String, Integer>> in = // [...]
-// Locally sort partitions in ascending order on the second String field and 
+// Locally sort partitions in ascending order on the second String field and
 // in descending order on the first String field.
 // Apply a MapPartition transformation on the sorted partitions.
 DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
@@ -1248,7 +1347,7 @@ DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
 
 ~~~scala
 val in: DataSet[(String, Int)] = // [...]
-// Locally sort partitions in ascending order on the second String field and 
+// Locally sort partitions in ascending order on the second String field and
 // in descending order on the first String field.
 // Apply a MapPartition transformation on the sorted partitions.
 val out = in.sortPartition(1, Order.ASCENDING)

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 0ea8724..160b506 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -18,6 +18,26 @@
 
 package org.apache.flink.compiler;
 
+import org.apache.flink.compiler.costs.CostEstimator;
+import org.apache.flink.compiler.costs.DefaultCostEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.CrossOperatorBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.common.operators.base.FilterOperatorBase;
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -28,11 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
 import org.apache.flink.compiler.dag.SortPartitionNode;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
@@ -40,23 +56,10 @@ import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.compiler.costs.CostEstimator;
-import org.apache.flink.compiler.costs.DefaultCostEstimator;
+
 import org.apache.flink.compiler.dag.BinaryUnionNode;
 import org.apache.flink.compiler.dag.BulkIterationNode;
 import org.apache.flink.compiler.dag.BulkPartialSolutionNode;
@@ -68,10 +71,11 @@ import org.apache.flink.compiler.dag.DataSourceNode;
 import org.apache.flink.compiler.dag.FilterNode;
 import org.apache.flink.compiler.dag.FlatMapNode;
 import org.apache.flink.compiler.dag.GroupReduceNode;
+import org.apache.flink.compiler.dag.GroupCombineNode;
 import org.apache.flink.compiler.dag.IterationNode;
+import org.apache.flink.compiler.dag.JoinNode;
 import org.apache.flink.compiler.dag.MapNode;
 import org.apache.flink.compiler.dag.MapPartitionNode;
-import org.apache.flink.compiler.dag.JoinNode;
 import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.dag.PactConnection;
 import org.apache.flink.compiler.dag.PartitionNode;
@@ -97,11 +101,14 @@ import org.apache.flink.compiler.plan.SourcePlanNode;
 import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.compiler.postpass.OptimizerPostPass;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Visitor;
 
@@ -686,6 +693,9 @@ public class PactCompiler {
 			else if (c instanceof GroupReduceOperatorBase) {
 				n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
 			}
+			else if (c instanceof GroupCombineOperatorBase) {
+				n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
+			}
 			else if (c instanceof JoinOperatorBase) {
 				n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
index b13c1be..091fbf6 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
@@ -186,7 +186,10 @@ public abstract class CostEstimator {
 			
 		case SORTED_GROUP_COMBINE:
 			// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
-		
+
+			// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
+		case ALL_GROUP_COMBINE:
+			
 		case UNION:
 			// pipelined local union is for free
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
new file mode 100644
index 0000000..50ae50d
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.operators.AllGroupCombineProperties;
+import org.apache.flink.compiler.operators.GroupCombineProperties;
+import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer representation of a <i>GroupCombineNode</i> operation.
+ */
+public class GroupCombineNode extends SingleInputNode {
+
+	private final List<OperatorDescriptorSingle> possibleProperties;
+
+	/**
+	 * Creates a new optimizer node for the given operator.
+	 *
+	 * @param operator The reduce operation.
+	 */
+	public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) {
+		super(operator);
+
+		if (this.keys == null) {
+			// case of a key-less reducer. force a parallelism of 1
+			setDegreeOfParallelism(1);
+		}
+
+		this.possibleProperties = initPossibleProperties();
+	}
+
+	private List<OperatorDescriptorSingle> initPossibleProperties() {
+		// see if an internal hint dictates the strategy to use
+		final Configuration conf = getPactContract().getParameters();
+
+		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
+		Ordering groupOrder = null;
+		if (getPactContract() instanceof GroupCombineOperatorBase) {
+			groupOrder = getPactContract().getGroupOrder();
+			if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
+				groupOrder = null;
+			}
+		}
+
+		OperatorDescriptorSingle props = (this.keys == null ?
+				new AllGroupCombineProperties() :
+				new GroupCombineProperties(this.keys, groupOrder));
+
+		return Collections.singletonList(props);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the operator represented by this optimizer node.
+	 *
+	 * @return The operator represented by this optimizer node.
+	 */
+	@Override
+	public GroupCombineOperatorBase<?, ?, ?> getPactContract() {
+		return (GroupCombineOperatorBase<?, ?, ?>) super.getPactContract();
+	}
+
+	@Override
+	public String getName() {
+		return "GroupCombine";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Estimates
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// no real estimates possible for a reducer.
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
new file mode 100644
index 0000000..7919d28
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class AllGroupCombineProperties extends OperatorDescriptorSingle {
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.ALL_GROUP_COMBINE;
+	}
+
+	@Override
+	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+		return new SingleInputPlanNode(node, "GroupCombine ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
+	}
+
+	@Override
+	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+		return Collections.singletonList(new RequestedGlobalProperties());
+	}
+
+	@Override
+	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+		return Collections.singletonList(new RequestedLocalProperties());
+	}
+
+
+	@Override
+	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
+		{
+			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+		}
+		gProps.clearUniqueFieldCombinations();
+		return gProps;
+	}
+
+
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties lProps) {
+		return lProps.clearUniqueFieldSets();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index ec38b47..d6a3c5f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -58,7 +58,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
-					"Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_COMBINE);
+					"Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
new file mode 100644
index 0000000..17f0980
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The properties file belonging to the GroupCombineNode. It translates the GroupCombine operation
+ * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping and sorting keys.
+ * @see org.apache.flink.compiler.dag.GroupCombineNode
+ */
+public final class GroupCombineProperties extends OperatorDescriptorSingle {
+
+	private final Ordering ordering;        // ordering that we need to use if an additional ordering is requested 
+
+	public GroupCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) {
+		super(groupKeys);
+
+		// if we have an additional ordering, construct the ordering to have primarily the grouping fields
+		
+		this.ordering = new Ordering();
+		for (Integer key : this.keyList) {
+			this.ordering.appendOrdering(key, null, Order.ANY);
+		}
+
+		// and next the additional order fields
+		if (additionalOrderKeys != null) {
+			for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) {
+				Integer field = additionalOrderKeys.getFieldNumber(i);
+				Order order = additionalOrderKeys.getOrder(i);
+				this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order);
+			}
+		}
+
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.SORTED_GROUP_COMBINE;
+	}
+
+	@Override
+	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+		node.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+		
+		// sorting key info
+		SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode(
+				node, 
+				"GroupCombine (" + node.getPactContract().getName() + ")",
+				in, // reuse the combine strategy also used in the group reduce
+				DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+
+		// set sorting comparator key info
+		singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections(), 0);
+		// set grouping comparator key info
+		singleInputPlanNode.setDriverKeyInfo(this.keyList, 1);
+		
+		return singleInputPlanNode;
+	}
+
+	@Override
+	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+		RequestedGlobalProperties props = new RequestedGlobalProperties();
+		props.setRandomPartitioning();
+		return Collections.singletonList(props);
+	}
+
+	@Override
+	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+		return Collections.singletonList(new RequestedLocalProperties());
+	}
+
+	@Override
+	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) {
+			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+		}
+		gProps.clearUniqueFieldCombinations();
+		return gProps;
+	}
+
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties lProps) {
+		return lProps.clearUniqueFieldSets();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
index 7919b2b..16e7c72 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
@@ -32,7 +32,10 @@ import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 
 /**
- * 
+ * Abstract base class for Operator descriptions which instantiates the node and sets the driver
+ * strategy and the sorting and grouping keys. Returns possible local and global properties and
+ * updates them after the operation has been performed.
+ * @see org.apache.flink.compiler.dag.SingleInputNode
  */
 public abstract class OperatorDescriptorSingle implements AbstractOperatorDescriptor {
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
deleted file mode 100644
index 7954773..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.compiler.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.compiler.dag.GroupReduceNode;
-import org.apache.flink.compiler.dag.SingleInputNode;
-import org.apache.flink.compiler.dataproperties.GlobalProperties;
-import org.apache.flink.compiler.dataproperties.LocalProperties;
-import org.apache.flink.compiler.dataproperties.PartitioningProperty;
-import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
-import org.apache.flink.compiler.plan.Channel;
-import org.apache.flink.compiler.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public final class PartialGroupProperties extends OperatorDescriptorSingle {
-	
-	public PartialGroupProperties(FieldSet keys) {
-		super(keys);
-	}
-	
-	@Override
-	public DriverStrategy getStrategy() {
-		return DriverStrategy.SORTED_GROUP_COMBINE;
-	}
-
-	@Override
-	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		// create in input node for combine with same DOP as input node
-		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
-		combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-
-		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
-				DriverStrategy.SORTED_GROUP_COMBINE);
-		// sorting key info
-		combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
-		// set grouping comparator key info
-		combiner.setDriverKeyInfo(this.keyList, 1);
-		
-		return combiner;
-	}
-
-	@Override
-	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
-	}
-
-	@Override
-	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
-		RequestedLocalProperties props = new RequestedLocalProperties();
-		props.setGroupedFields(this.keys);
-		return Collections.singletonList(props);
-	}
-	
-	@Override
-	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
-		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
-		{
-			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-		}
-		gProps.clearUniqueFieldCombinations();
-		return gProps;
-	}
-	
-	@Override
-	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		return lProps.clearUniqueFieldSets();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
index 7728948..ab21b69 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
@@ -433,7 +433,7 @@ public class PlanJSONDumpGenerator {
 				break;
 			
 			case ALL_GROUP_REDUCE:
-			case ALL_GROUP_COMBINE:
+			case ALL_GROUP_REDUCE_COMBINE:
 				locString = "Group Reduce All";
 				break;
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 11ac231..4081a4b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -239,7 +239,7 @@ public class JavaApiPostPass implements OptimizerPostPass {
 
 
 		if(javaOp instanceof GroupReduceOperatorBase &&
-				(source.getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == DriverStrategy.ALL_GROUP_COMBINE)) {
+				(source.getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == DriverStrategy.ALL_GROUP_REDUCE_COMBINE)) {
 			GroupReduceOperatorBase<?, ?, ?> groupNode = (GroupReduceOperatorBase<?, ?, ?>) javaOp;
 			type = groupNode.getInput().getOperatorInfo().getOutputType();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
index 454438c..8f2292b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
@@ -118,7 +118,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			
 			// check that both reduce and combiner have the same strategy
 			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.ALL_GROUP_COMBINE, combineNode.getDriverStrategy());
+			assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
 			
 			// check DOP
 			assertEquals(8, sourceNode.getDegreeOfParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index 29ebafe..ef52b32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -31,9 +31,10 @@ import java.io.Serializable;
  * This special variant of the combine function reduces the group of elements into a single element. A variant
  * that can return multiple values per group is defined in {@link FlatCombineFunction}.
  * 
- * @param <T> The data type processed by the combine function.
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
  */
-public interface CombineFunction<T> extends Function, Serializable {
+public interface CombineFunction<IN, OUT> extends Function, Serializable {
 
 	/**
 	 * The combine method, called (potentially multiple timed) with subgroups of elements.
@@ -44,5 +45,5 @@ public interface CombineFunction<T> extends Function, Serializable {
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */
-	T combine(Iterable<T> values) throws Exception;
+	OUT combine(Iterable<IN> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index 53a2edc..b90b3ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -33,9 +33,10 @@ import org.apache.flink.util.Collector;
  * This special variant of the combine function supports to return more than one element per group.
  * It is frequently less efficient to use than the {@link CombineFunction}.
  * 
- * @param <T> The data type processed by the combine function.
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
  */
-public interface FlatCombineFunction<T> extends Function, Serializable {
+public interface FlatCombineFunction<IN, OUT> extends Function, Serializable {
 
 	/**
 	 * The combine method, called (potentially multiple timed) with subgroups of elements.
@@ -46,5 +47,5 @@ public interface FlatCombineFunction<T> extends Function, Serializable {
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */
-	void combine(Iterable<T> values, Collector<T> out) throws Exception;
+	void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
index a7e7b70..17aca88 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
@@ -30,12 +30,13 @@ import org.apache.flink.util.Collector;
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
  *
- * @param <T> The data type of the elements to be combined.
+ * @param <IN> The data type of the elements to be combined.
+ * @param <OUT> The resulting data type of the elements to be combined.
  */
-public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> {
+public abstract class RichFlatCombineFunction<IN, OUT> extends AbstractRichFunction implements FlatCombineFunction<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public abstract void combine(Iterable<T> values, Collector<T> out) throws Exception;
+	public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index e3b8632..b6c92c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
  */
-public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN> {
+public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN, IN> {
 	
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
new file mode 100644
index 0000000..2a47c45
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Base operator for the combineGroup transformation. It receives the UDF GroupCombineOperator as an input.
+ * This class is later processed by the compiler to generate the plan.
+ * @see org.apache.flink.api.common.functions.CombineFunction
+ */
+public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+
+
+	/** The ordering for the order inside a reduce group. */
+	private Ordering groupOrder;
+
+	public GroupCombineOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, int[] keyPositions, String name) {
+		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions, name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the order of the elements within a reduce group.
+	 *
+	 * @param order The order for the elements in a reduce group.
+	 */
+	public void setGroupOrder(Ordering order) {
+		this.groupOrder = order;
+	}
+
+	/**
+	 * Gets the order of elements within a reduce group. If no such order has been
+	 * set, this method returns null.
+	 *
+	 * @return The secondary order.
+	 */
+	public Ordering getGroupOrder() {
+		return this.groupOrder;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
+		FlatCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
+
+		UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
+		TypeInformation<IN> inputType = operatorInfo.getInputType();
+
+		int[] keyColumns = getKeyColumns(0);
+
+		if (!(inputType instanceof CompositeType) && (keyColumns.length > 0 || groupOrder != null)) {
+			throw new InvalidProgramException("Grouping or group-sorting is only possible on composite type.");
+		}
+
+		int[] sortColumns = keyColumns;
+		boolean[] sortOrderings = new boolean[sortColumns.length];
+
+		if (groupOrder != null) {
+			sortColumns = ArrayUtils.addAll(sortColumns, groupOrder.getFieldPositions());
+			sortOrderings = ArrayUtils.addAll(sortOrderings, groupOrder.getFieldSortDirections());
+		}
+
+		if (inputType instanceof CompositeType) {
+			if(sortColumns.length == 0) { // => all reduce. No comparator
+				Preconditions.checkArgument(sortOrderings.length == 0);
+			} else {
+				final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
+
+				Collections.sort(inputData, new Comparator<IN>() {
+					@Override
+					public int compare(IN o1, IN o2) {
+						return sortComparator.compare(o1, o2);
+					}
+				});
+			}
+		}
+
+		FunctionUtils.setFunctionRuntimeContext(function, ctx);
+		FunctionUtils.openFunction(function, this.parameters);
+
+		ArrayList<OUT> result = new ArrayList<OUT>();
+
+		if (keyColumns.length == 0) {
+			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+			List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
+			for (IN in: inputData) {
+				inputDataCopy.add(inputSerializer.copy(in));
+			}
+			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+
+			function.combine(inputDataCopy, collector);
+		} else {
+			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+			boolean[] keyOrderings = new boolean[keyColumns.length];
+			final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
+
+			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
+
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+
+			while (keyedIterator.nextKey()) {
+				function.combine(keyedIterator.getValues(), collector);
+			}
+		}
+
+		FunctionUtils.closeFunction(function);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d7e6e94..a884f6d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -58,15 +59,16 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.operators.FilterOperator;
-import org.apache.flink.api.java.operators.ProjectOperator;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.PartitionOperator;
+import org.apache.flink.api.java.operators.ProjectOperator;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.operators.SortedGrouping;
@@ -459,6 +461,28 @@ public abstract class DataSet<T> {
 	}
 
 	/**
+	 * Applies a CombineFunction on a non-grouped {@link DataSet}.
+	 * A CombineFunction is similar to a GroupReduceFunction but does not perform a full data exchange. Instead, the
+	 * CombineFunction calls the combine method once per partition for combining a group of results. This
+	 * operator is suitable for combining values into an intermediate format before doing a proper groupReduce where
+	 * the data is shuffled across the node for further reduction. The GroupReduce operator can also be supplied with
+	 * a combiner by implementing the RichGroupReduce function. The combine method of the RichGroupReduce function
+	 * demands input and output type to be the same. The CombineFunction, on the other side, can have an arbitrary
+	 * output type.
+	 * @param combiner The CombineFunction that is applied on the DataSet.
+	 * @return A GroupCombineOperator which represents the combined DataSet.
+	 */
+	public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+		if (combiner == null) {
+			throw new NullPointerException("GroupReduce function must not be null.");
+		}
+
+		String callLocation = Utils.getCallLocationName();
+		TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, getType(), callLocation, true);
+		return new GroupCombineOperator<T, R>(this, resultType, clean(combiner), callLocation);
+	}
+
+	/**
 	 * Selects an element with minimum value.
 	 * <p>
 	 * The minimum is computed over the specified fields in lexicographical order.

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index 890a0ca..fbb7029 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 @Combinable
-public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T> {
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T, T> {
 	private static final long serialVersionUID = 1L;
 
 	private final int count;

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
new file mode 100644
index 0000000..617162b
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
+import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
+import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data
+ * locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values 
+ * into an intermediate representation before applying a proper reduce operation.
+ *
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ */
+public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupCombineOperator<IN, OUT>> {
+
+	private final FlatCombineFunction<IN, OUT> function;
+
+	private final Grouping<IN> grouper;
+
+	private final String defaultName;
+
+	/**
+	 * Constructor for a non-grouped reduce (all reduce).
+	 *
+	 * @param input The input data set to the groupReduce function.
+	 * @param resultType The type information for the resulting type.
+	 * @param function The user-defined GroupReduce function.
+	 * @param defaultName The operator's name.
+	 */
+	public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+		super(input, resultType);
+		this.function = function;
+		this.grouper = null;
+		this.defaultName = defaultName;
+	}
+
+	/**
+	 * Constructor for a grouped reduce.
+	 *
+	 * @param input The grouped input to be processed group-wise by the groupReduce function.
+	 * @param function The user-defined GroupReduce function.
+	 */
+	public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+		super(input != null ? input.getDataSet() : null, resultType);
+
+		this.function = function;
+		this.grouper = input;
+		this.defaultName = defaultName;
+	}
+
+	@Override
+	protected FlatCombineFunction<IN, OUT> getFunction() {
+		return function;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Translation
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected GroupCombineOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
+
+		String name = getName() != null ? getName() : "GroupCombine at " + defaultName;
+
+		// distinguish between grouped reduce and non-grouped reduce
+		if (grouper == null) {
+			// non grouped reduce
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
+					new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+
+			po.setInput(input);
+			// the degree of parallelism for a non grouped reduce can only be 1
+			po.setDegreeOfParallelism(1);
+			return po;
+		}
+
+		if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+
+			@SuppressWarnings("unchecked")
+			Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+
+			if (grouper instanceof SortedGrouping) {
+				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
+				Keys.SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouper.getSortSelectionFunctionKey();
+
+				PlanUnwrappingSortedGroupCombineOperator<IN, OUT, ?, ?> po = translateSelectorFunctionSortedReducer(
+						selectorKeys, sortKeys, function, getInputType(), getResultType(), name, input);
+
+				// set group order
+				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
+				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
+
+				Ordering o = new Ordering();
+				for(int i=0; i < sortKeyPositions.length; i++) {
+					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
+				}
+				po.setGroupOrder(o);
+
+				po.setDegreeOfParallelism(this.getParallelism());
+				return po;
+			} else {
+				PlanUnwrappingGroupCombineOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
+						selectorKeys, function, getInputType(), getResultType(), name, input);
+
+				po.setDegreeOfParallelism(this.getParallelism());
+				return po;
+			}
+		}
+		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+
+			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
+					new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+
+			po.setInput(input);
+			po.setDegreeOfParallelism(getParallelism());
+
+			// set group order
+			if (grouper instanceof SortedGrouping) {
+				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
+
+				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
+				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
+
+				Ordering o = new Ordering();
+				for(int i=0; i < sortKeyPositions.length; i++) {
+					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
+				}
+				po.setGroupOrder(o);
+			}
+
+			return po;
+		}
+		else {
+			throw new UnsupportedOperationException("Unrecognized key type.");
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+
+	private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer(
+			Keys.SelectorFunctionKeys<IN, ?> rawKeys, FlatCombineFunction<IN, OUT> function,
+			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
+	{
+		@SuppressWarnings("unchecked")
+		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+
+		KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
+
+		PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer = new PlanUnwrappingGroupCombineOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey);
+
+		MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+
+		reducer.setInput(mapper);
+		mapper.setInput(input);
+
+		// set the mapper's parallelism to the input parallelism to make sure it is chained
+		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+
+		return reducer;
+	}
+
+	private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
+			Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> function,
+			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
+	{
+		@SuppressWarnings("unchecked")
+		final Keys.SelectorFunctionKeys<IN, K1> groupingKey = (Keys.SelectorFunctionKeys<IN, K1>) rawGroupingKey;
+
+		@SuppressWarnings("unchecked")
+		final Keys.SelectorFunctionKeys<IN, K2> sortingKey = (Keys.SelectorFunctionKeys<IN, K2>) rawSortingKey;
+
+		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple3<K1, K2, IN>>(groupingKey.getKeyType(), sortingKey.getKeyType(), inputType);
+
+		TwoKeyExtractingMapper<IN, K1, K2> extractor = new TwoKeyExtractingMapper<IN, K1, K2>(groupingKey.getKeyExtractor(), sortingKey.getKeyExtractor());
+
+		PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer = new PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);
+
+		MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>> mapper = new MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple3<K1, K2, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+
+		reducer.setInput(mapper);
+		mapper.setInput(input);
+
+		// set the mapper's parallelism to the input parallelism to make sure it is chained
+		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+
+		return reducer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 38c6c68..b2054bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
@@ -156,6 +157,27 @@ public class SortedGrouping<T> extends Grouping<T> {
 		return new GroupReduceOperator<T, R>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName() );
 	}
 
+	/**
+	 * Applies a CombineFunction on a grouped {@link DataSet}.
+	 * A CombineFunction is similar to a GroupReduceFunction but does not perform a full data exchange. Instead, the
+	 * CombineFunction calls the combine method once per partition for combining a group of results. This
+	 * operator is suitable for combining values into an intermediate format before doing a proper groupReduce where
+	 * the data is shuffled across the node for further reduction. The GroupReduce operator can also be supplied with
+	 * a combiner by implementing the RichGroupReduce function. The combine method of the RichGroupReduce function
+	 * demands input and output type to be the same. The CombineFunction, on the other side, can have an arbitrary
+	 * output type.
+	 * @param combiner The CombineFunction that is applied on the DataSet.
+	 * @return A GroupCombineOperator which represents the combined DataSet.
+	 */
+	public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+		if (combiner == null) {
+			throw new NullPointerException("GroupReduce function must not be null.");
+		}
+		TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, this.getDataSet().getType());
+
+		return new GroupCombineOperator<T, R>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName());
+	}
+
 	
 	/**
 	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 732c59b..0f3faa0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -159,7 +160,28 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 
 		return new GroupReduceOperator<T, R>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName());
 	}
-	
+
+	/**
+	 * Applies a CombineFunction on a grouped {@link DataSet}.
+	 * A CombineFunction is similar to a GroupReduceFunction but does not perform a full data exchange. Instead, the
+	 * CombineFunction calls the combine method once per partition for combining a group of results. This
+	 * operator is suitable for combining values into an intermediate format before doing a proper groupReduce where
+	 * the data is shuffled across the node for further reduction. The GroupReduce operator can also be supplied with
+	 * a combiner by implementing the RichGroupReduce function. The combine method of the RichGroupReduce function
+	 * demands input and output type to be the same. The CombineFunction, on the other side, can have an arbitrary
+	 * output type.
+	 * @param combiner The CombineFunction that is applied on the DataSet.
+	 * @return A GroupCombineOperator which represents the combined DataSet.
+	 */
+	public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+		if (combiner == null) {
+			throw new NullPointerException("GroupReduce function must not be null.");
+		}
+		TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, this.getDataSet().getType());
+
+		return new GroupCombineOperator<T, R>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName());
+	}
+
 	/**
 	 * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
 	 * @param n The desired number of elements for each group.

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
new file mode 100644
index 0000000..ae4ba11
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * A group combine operator that takes 2-tuples (key-value pairs), and applies the group combine operation only
+ * on the unwrapped values.
+ */
+public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, OUT>> {
+
+	public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
+												TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
+	{
+		super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
+				new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
+		
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
+		implements FlatCombineFunction<Tuple2<K, IN>, OUT>
+	{
+	
+		private static final long serialVersionUID = 1L;
+		
+		private final TupleUnwrappingIterator<IN, K> iter; 
+		
+		private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, OUT> wrapped) {
+			super(wrapped);
+			this.iter = new TupleUnwrappingIterator<IN, K>();
+		}
+	
+	
+		@Override
+		public void combine(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
+			iter.set(values.iterator());
+			this.wrappedFunction.combine(iter, out);
+		}
+		
+		@Override
+		public String toString() {
+			return this.wrappedFunction.toString();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index e4a041b..1d59a21 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -47,7 +47,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	
 	@RichGroupReduceFunction.Combinable
 	public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
 	{
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
new file mode 100644
index 0000000..b3d8470
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
+
+/**
+ * A reduce operator that takes 3-tuples (groupKey, sortKey, value), and applies the sorted partial group reduce
+ * operation only on the unwrapped values.
+ */
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, FlatCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+
+	public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
+													TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
+	{
+		super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
+				new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType),
+				groupingKey.computeLogicalKeyPositions(), 
+				name);
+
+	}
+
+	public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
+			implements FlatCombineFunction<Tuple3<K1, K2, IN>, OUT>
+	{
+
+		private static final long serialVersionUID = 1L;
+
+		private final Tuple3UnwrappingIterator<IN, K1, K2> iter;
+
+		private TupleUnwrappingGroupReducer(FlatCombineFunction<IN, OUT> wrapped) {
+			super(wrapped);
+			this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
+		}
+
+
+		@Override
+		public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
+			iter.set(values.iterator());
+			this.wrappedFunction.combine(iter, out);
+		}
+
+		@Override
+		public String toString() {
+			return this.wrappedFunction.toString();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 46d247a..757ff56 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -47,7 +47,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 
 	@RichGroupReduceFunction.Combinable
 	public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, FlatCombineFunction<Tuple3<K1, K2, IN>>
+		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, FlatCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
 	{
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index ff1bd28..875e9c1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -367,7 +367,7 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Grou
 	
 	// ============================================================================================
 	
-	public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
+	public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record, Record> {
 		
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index fdfe941..4527aa0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -34,6 +34,7 @@ import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -133,6 +134,16 @@ public class TypeExtractor {
 	{
 		return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
 	}
+
+	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
+		return getGroupCombineReturnTypes(combineInterface, inType, null, false);
+	}
+
+	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
+																			String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) combineInterface, FlatCombineFunction.class, true, true, inType, functionName, allowMissing);
+	}
 	
 	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,