You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:40:00 UTC

[29/39] [FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from java.util.Iterator) Iterables over transient data throw an TraversableOnceException when iterated over again.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index 3860fbe..074a70d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.operators;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -29,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.RecordOperator;
@@ -36,17 +40,20 @@ import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.record.functions.FunctionAnnotation;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * ReduceOperator evaluating a {@link ReduceFunction} over each group of records that share the same key.
  * 
  * @see ReduceFunction
  */
-public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, ReduceFunction> implements RecordOperator {
+public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, GroupReduceFunction<Record, Record>> implements RecordOperator {
 	
 	private static final String DEFAULT_NAME = "<Unnamed Reducer>";		// the default name for contracts
 	
@@ -55,6 +62,8 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	 */
 	private final Class<? extends Key<?>>[] keyTypes;
 	
+	private final UserCodeWrapper<ReduceFunction> originalFunction;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -63,7 +72,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
 	 */
 	public static Builder builder(ReduceFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<ReduceFunction>(udf));
+		UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
+		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
+		
+		return new Builder(original, wrapped);
 	}
 	
 	/**
@@ -74,7 +87,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	 * @param keyColumn The position of the key.
 	 */
 	public static Builder builder(ReduceFunction udf, Class<? extends Key<?>> keyClass, int keyColumn) {
-		return new Builder(new UserCodeObjectWrapper<ReduceFunction>(udf), keyClass, keyColumn);
+		UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
+		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
+		
+		return new Builder(original, wrapped, keyClass, keyColumn);
 	}
 
 	/**
@@ -83,7 +100,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
 	 */
 	public static Builder builder(Class<? extends ReduceFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<ReduceFunction>(udf));
+		UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
+		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
+		
+		return new Builder(original, wrapped);
 	}
 	
 	/**
@@ -94,7 +115,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	 * @param keyColumn The position of the key.
 	 */
 	public static Builder builder(Class<? extends ReduceFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn) {
-		return new Builder(new UserCodeClassWrapper<ReduceFunction>(udf), keyClass, keyColumn);
+		UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
+		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
+		
+		return new Builder(original, wrapped, keyClass, keyColumn);
 	}
 	
 	/**
@@ -102,8 +127,10 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	 * @param builder
 	 */
 	protected ReduceOperator(Builder builder) {
-		super(builder.udf, OperatorInfoHelper.unary(), builder.getKeyColumnsArray(), builder.name);
+		super(builder.udfWrapper, OperatorInfoHelper.unary(), builder.getKeyColumnsArray(), builder.name);
+		
 		this.keyTypes = builder.getKeyClassesArray();
+		this.originalFunction = builder.originalUdf;
 		
 		if (builder.inputs != null && !builder.inputs.isEmpty()) {
 			setInput(Operator.createUnionCascade(builder.inputs));
@@ -111,7 +138,8 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 		
 		setGroupOrder(builder.secondaryOrder);
 		setBroadcastVariables(builder.broadcastInputs);
-		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+		
+		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(originalFunction));
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -126,7 +154,7 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	
 	@Override
 	public boolean isCombinable() {
-		return super.isCombinable() || getUserCodeWrapper().getUserCodeAnnotation(Combinable.class) != null;
+		return super.isCombinable() || originalFunction.getUserCodeAnnotation(Combinable.class) != null;
 	}
 	
 	/**
@@ -178,7 +206,8 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 	public static class Builder {
 		
 		/* The required parameters */
-		private final UserCodeWrapper<ReduceFunction> udf;
+		private final UserCodeWrapper<ReduceFunction> originalUdf;
+		private final UserCodeWrapper<GroupReduceFunction<Record, Record>> udfWrapper;
 		private final List<Class<? extends Key<?>>> keyClasses;
 		private final List<Integer> keyColumns;
 		
@@ -191,10 +220,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 		/**
 		 * Creates a Builder with the provided {@link ReduceFunction} implementation.
 		 * 
-		 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
+		 * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
 		 */
-		private Builder(UserCodeWrapper<ReduceFunction> udf) {
-			this.udf = udf;
+		private Builder(UserCodeWrapper<ReduceFunction> originalUdf, UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf) {
+			this.originalUdf = originalUdf;
+			this.udfWrapper = wrappedUdf;
 			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
 			this.keyColumns = new ArrayList<Integer>();
 			this.inputs = new ArrayList<Operator<Record>>();
@@ -204,12 +234,16 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 		/**
 		 * Creates a Builder with the provided {@link ReduceFunction} implementation.
 		 * 
-		 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
+		 * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
 		 * @param keyClass The class of the key data type.
 		 * @param keyColumn The position of the key.
 		 */
-		private Builder(UserCodeWrapper<ReduceFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn) {
-			this.udf = udf;
+		private Builder(UserCodeWrapper<ReduceFunction> originalUdf, 
+						UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf,
+						Class<? extends Key<?>> keyClass, int keyColumn)
+		{
+			this.originalUdf = originalUdf;
+			this.udfWrapper = wrappedUdf;
 			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
 			this.keyClasses.add(keyClass);
 			this.keyColumns = new ArrayList<Integer>();
@@ -325,9 +359,48 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
 		 */
 		public ReduceOperator build() {
 			if (name == null) {
-				name = udf.getUserCodeClass().getName();
+				name = udfWrapper.getUserCodeClass().getName();
 			}
 			return new ReduceOperator(this);
 		}
 	}
+	
+	// ============================================================================================
+	
+	public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public WrappingReduceFunction(ReduceFunction reducer) {
+			super(reducer);
+		}
+		
+		@Override
+		public final void reduce(Iterable<Record> records, Collector<Record> out) throws Exception {
+			this.wrappedFunction.reduce(records.iterator(), out);
+		}
+
+		@Override
+		public final void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
+			this.wrappedFunction.combine(records.iterator(), out);
+		}
+	}
+	
+	public static final class WrappingClassReduceFunction extends WrappingReduceFunction {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public WrappingClassReduceFunction(Class<? extends ReduceFunction> reducer) {
+			super(InstantiationUtil.instantiate(reducer));
+		}
+		
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			out.writeObject(wrappedFunction.getClass());
+		}
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			Class<?> clazz = (Class<?>) in.readObject();
+			this.wrappedFunction = (ReduceFunction) InstantiationUtil.instantiate(clazz);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index be872e5..9fcf963 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -246,7 +246,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 	public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 
 		@Override
-		public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double, Long, String>> second,
+		public void coGroup(Iterable<Tuple2<Double, String>> first, Iterable<Tuple3<Double, Long, String>> second,
 				Collector<Tuple3<Double, Long, String>> out) {
 		}
 	}
@@ -254,7 +254,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 	public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
 
 		@Override
-		public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double, String>> first,
+		public void coGroup(Iterable<Tuple3<Double, Long, String>> second, Iterable<Tuple2<Double, String>> first,
 				Collector<Tuple3<Double, Long, String>> out) {
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
new file mode 100644
index 0000000..78a7ad8
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoGroupWrappingFunctionTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testWrappedCoGroupObject() {
+		try {
+			AtomicInteger methodCounter = new AtomicInteger();
+			
+			CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(methodCounter), LongValue.class, 1, 2).build();
+			
+			RichFunction cogrouper = (RichFunction) coGroupOp.getUserCodeWrapper().getUserCodeObject();
+			
+			// test the method invocations
+			cogrouper.close();
+			cogrouper.open(new Configuration());
+			assertEquals(2, methodCounter.get());
+			
+			// prepare the coGroup
+			final List<Record> target = new ArrayList<Record>();
+			Collector<Record> collector = new Collector<Record>() {
+				@Override
+				public void collect(Record record) {
+					target.add(record);
+				}
+				@Override
+				public void close() {}
+			};
+			
+			List<Record> source1 = new ArrayList<Record>();
+			source1.add(new Record(new IntValue(42)));
+			source1.add(new Record(new IntValue(13)));
+			
+			List<Record> source2 = new ArrayList<Record>();
+			source2.add(new Record(new LongValue(11)));
+			source2.add(new Record(new LongValue(17)));
+			
+			// test coGroup
+			((org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>) cogrouper).coGroup(source1, source2, collector);
+			assertEquals(4, target.size());
+			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+			assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
+			assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
+			target.clear();
+			
+			// test the serialization
+			SerializationUtils.clone((java.io.Serializable) cogrouper);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testWrappedCoGroupClass() {
+		try {
+			CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build();
+			
+			UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf = coGroupOp.getUserCodeWrapper();
+			UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> copy = SerializationUtils.clone(udf);
+			org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> cogrouper = copy.getUserCodeObject();
+			
+			// prepare the coGpuü
+			final List<Record> target = new ArrayList<Record>();
+			Collector<Record> collector = new Collector<Record>() {
+				@Override
+				public void collect(Record record) {
+					target.add(record);
+				}
+				@Override
+				public void close() {}
+			};
+			
+			List<Record> source1 = new ArrayList<Record>();
+			source1.add(new Record(new IntValue(42)));
+			source1.add(new Record(new IntValue(13)));
+			
+			List<Record> source2 = new ArrayList<Record>();
+			source2.add(new Record(new LongValue(11)));
+			source2.add(new Record(new LongValue(17)));
+			
+			// test coGroup
+			cogrouper.coGroup(source1, source2, collector);
+			assertEquals(4, target.size());
+			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+			assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
+			assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
+			target.clear();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExtractSemantics() {
+		try {
+			{
+				CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(), LongValue.class, 1, 2).build();
+				
+				DualInputSemanticProperties props = coGroupOp.getSemanticProperties();
+				FieldSet fw2 = props.getForwardedField1(2);
+				FieldSet fw4 = props.getForwardedField2(4);
+				
+				assertNotNull(fw2);
+				assertNotNull(fw4);
+				assertEquals(1, fw2.size());
+				assertEquals(1, fw4.size());
+				assertTrue(fw2.contains(2));
+				assertTrue(fw4.contains(4));
+			}
+			{
+				CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build();
+				
+				DualInputSemanticProperties props = coGroupOp.getSemanticProperties();
+				FieldSet fw2 = props.getForwardedField1(2);
+				FieldSet fw4 = props.getForwardedField2(4);
+				
+				assertNotNull(fw2);
+				assertNotNull(fw4);
+				assertEquals(1, fw2.size());
+				assertEquals(1, fw4.size());
+				assertTrue(fw2.contains(2));
+				assertTrue(fw4.contains(4));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@ConstantFieldsFirst(2)
+	@ConstantFieldsSecond(4)
+	public static class TestCoGroupFunction extends CoGroupFunction {
+		
+		private final AtomicInteger methodCounter;
+		
+		private TestCoGroupFunction(AtomicInteger methodCounter) {
+			this.methodCounter= methodCounter;
+		}
+		
+		public TestCoGroupFunction() {
+			methodCounter = new AtomicInteger();
+		}
+		
+		@Override
+		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception {
+			while (records1.hasNext()) {
+				out.collect(records1.next());
+			}
+			while (records2.hasNext()) {
+				out.collect(records2.next());
+			}
+		}
+		
+		@Override
+		public void close() throws Exception {
+			methodCounter.incrementAndGet();
+			super.close();
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			methodCounter.incrementAndGet();
+			super.open(parameters);
+		}
+	};
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
new file mode 100644
index 0000000..653bb16
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class ReduceWrappingFunctionTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testWrappedReduceObject() {
+		try {
+			AtomicInteger methodCounter = new AtomicInteger();
+			
+			ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction(methodCounter)).build();
+			
+			RichFunction reducer = (RichFunction) reduceOp.getUserCodeWrapper().getUserCodeObject();
+			
+			// test the method invocations
+			reducer.close();
+			reducer.open(new Configuration());
+			assertEquals(2, methodCounter.get());
+			
+			// prepare the reduce / combine tests
+			final List<Record> target = new ArrayList<Record>();
+			Collector<Record> collector = new Collector<Record>() {
+				@Override
+				public void collect(Record record) {
+					target.add(record);
+				}
+				@Override
+				public void close() {}
+			};
+			
+			List<Record> source = new ArrayList<Record>();
+			source.add(new Record(new IntValue(42), new LongValue(11)));
+			source.add(new Record(new IntValue(13), new LongValue(17)));
+			
+			// test reduce
+			((GroupReduceFunction<Record, Record>) reducer).reduce(source, collector);
+			assertEquals(2, target.size());
+			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+			target.clear();
+			
+			// test combine
+			((FlatCombineFunction<Record>) reducer).combine(source, collector);
+			assertEquals(2, target.size());
+			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+			target.clear();
+			
+			// test the serialization
+			SerializationUtils.clone((java.io.Serializable) reducer);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testWrappedReduceClass() {
+		try {
+			ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
+			
+			UserCodeWrapper<GroupReduceFunction<Record, Record>> udf = reduceOp.getUserCodeWrapper();
+			UserCodeWrapper<GroupReduceFunction<Record, Record>> copy = SerializationUtils.clone(udf);
+			GroupReduceFunction<Record, Record> reducer = copy.getUserCodeObject();
+			
+			// prepare the reduce / combine tests
+			final List<Record> target = new ArrayList<Record>();
+			Collector<Record> collector = new Collector<Record>() {
+				@Override
+				public void collect(Record record) {
+					target.add(record);
+				}
+				@Override
+				public void close() {}
+			};
+			
+			List<Record> source = new ArrayList<Record>();
+			source.add(new Record(new IntValue(42), new LongValue(11)));
+			source.add(new Record(new IntValue(13), new LongValue(17)));
+			
+			// test reduce
+			reducer.reduce(source, collector);
+			assertEquals(2, target.size());
+			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+			target.clear();
+			
+			// test combine
+			((FlatCombineFunction<Record>) reducer).combine(source, collector);
+			assertEquals(2, target.size());
+			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+			target.clear();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExtractSemantics() {
+		try {
+			{
+				ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
+				
+				SingleInputSemanticProperties props = reduceOp.getSemanticProperties();
+				FieldSet fw2 = props.getForwardedField(2);
+				FieldSet fw4 = props.getForwardedField(4);
+				
+				assertNotNull(fw2);
+				assertNotNull(fw4);
+				assertEquals(1, fw2.size());
+				assertEquals(1, fw4.size());
+				assertTrue(fw2.contains(2));
+				assertTrue(fw4.contains(4));
+			}
+			{
+				ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
+				
+				SingleInputSemanticProperties props = reduceOp.getSemanticProperties();
+				FieldSet fw2 = props.getForwardedField(2);
+				FieldSet fw4 = props.getForwardedField(4);
+				
+				assertNotNull(fw2);
+				assertNotNull(fw4);
+				assertEquals(1, fw2.size());
+				assertEquals(1, fw4.size());
+				assertTrue(fw2.contains(2));
+				assertTrue(fw4.contains(4));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCombinable() {
+		try {
+			{
+				ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
+				assertTrue(reduceOp.isCombinable());
+			}
+			{
+				ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
+				assertTrue(reduceOp.isCombinable());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Combinable
+	@ConstantFields({2, 4})
+	public static class TestReduceFunction extends ReduceFunction {
+		
+		private final AtomicInteger methodCounter;
+		
+		private TestReduceFunction(AtomicInteger methodCounter) {
+			this.methodCounter= methodCounter;
+		}
+		
+		public TestReduceFunction() {
+			methodCounter = new AtomicInteger();
+		}
+		
+		@Override
+		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+			while (records.hasNext()) {
+				out.collect(records.next());
+			}
+		}
+		
+		@Override
+		public void close() throws Exception {
+			methodCounter.incrementAndGet();
+			super.close();
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			methodCounter.incrementAndGet();
+			super.open(parameters);
+		}
+	};
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index d5044a8..c8a8ee9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.type.extractor;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -72,7 +71,7 @@ public class TypeExtractorTest {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public void reduce(Iterator<Boolean> values, Collector<Boolean> out) throws Exception {
+			public void reduce(Iterable<Boolean> values, Collector<Boolean> out) throws Exception {
 				// nothing to do
 			}
 		};
@@ -1084,7 +1083,7 @@ public class TypeExtractorTest {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public void coGroup(Iterator<String[]> first, Iterator<String[]> second, Collector<String[]> out) throws Exception {
+			public void coGroup(Iterable<String[]> first, Iterable<String[]> second, Collector<String[]> out) throws Exception {
 				// nothing to do
 			}
 		};

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
index 7dd4dea..ab18bf8 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -48,14 +48,11 @@ public class CoGroupITCase implements Serializable {
 			DataSet<Tuple2<Integer,String>> joined = left.coGroup(right).where(0).equalTo(0)
 					.with((values1, values2, out) -> {
 						int sum = 0;
-						String conc = "";
-						while (values1.hasNext()) {
-							sum += values1.next().f0;
-							conc += values1.next().f1;
+						for (Tuple2<Integer, String> next : values1) {
+							sum += next.f0;
 						}
-						while (values2.hasNext()) {
-							sum += values2.next().f0;
-							conc += values2.next().f1;
+						for (Tuple2<Integer, String> next : values2) {
+							sum += next.f0;
 						}
 					});
 			env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
index a86de1f..7b5d659 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -38,8 +38,7 @@ public class GroupReduceITCase implements Serializable {
 			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
 			DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
 				String conc = "";
-				while (values.hasNext()) {
-					String s = values.next();
+				for (String s : values) {
 					conc = conc.concat(s);
 				}
 				out.collect(conc);
@@ -68,9 +67,8 @@ public class GroupReduceITCase implements Serializable {
 					.groupBy(0)
 					.reduceGroup((values, out) -> {
 						String conc = "";
-						while (values.hasNext()) {
-							String s = values.next().f1;
-							conc = conc.concat(s);
+						for (Tuple2<Integer,String> next : values) {
+							conc = conc.concat(next.f1);
 						}
 						out.collect(conc);
 					});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 8761a2e..8590b78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
-import java.util.Iterator;
+import java.util.Collections;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -29,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
 import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.util.EmptyIterator;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
@@ -132,7 +130,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 		
 		final KeyGroupedIterator<IT2> probeSideInput = new KeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator);
 		final SingleElementIterator<IT1> siIter = new SingleElementIterator<IT1>();
-		final Iterator<IT1> emptySolutionSide = EmptyIterator.<IT1>get();
+		final Iterable<IT1> emptySolutionSide = Collections.emptySet();
 		
 		final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this.probeSideComparator, this.pairComparator);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index f2020c7..b3c0ece 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -132,7 +129,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 		
 		final KeyGroupedIterator<IT1> probeSideInput = new KeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator);
 		final SingleElementIterator<IT2> siIter = new SingleElementIterator<IT2>();
-		final Iterator<IT2> emptySolutionSide = EmptyIterator.<IT2>get();
+		final Iterable<IT2> emptySolutionSide = EmptyIterator.<IT2>get();
 		
 		final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c8f217c..9bc1893 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -697,7 +697,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
 			// check if the class is a subclass, if the check is required
 			if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
-				Thread.dumpStack();
 				throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + 
 						stubSuperClass.getName() + "' as is required.");
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index da8a11b..6a6f6ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
 
 
 /**
@@ -503,7 +504,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * This class implements an iterator over values from a sort buffer. The iterator returns the values of a given
 	 * interval.
 	 */
-	private static final class CombineValueIterator<E> implements Iterator<E> {
+	private static final class CombineValueIterator<E> implements Iterator<E>, Iterable<E> {
 		
 		private final InMemorySorter<E> buffer; // the buffer from which values are returned
 		
@@ -512,6 +513,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 		private int last; // the position of the last value to be returned
 
 		private int position; // the position of the next value to be returned
+		
+		private boolean iteratorAvailable;
 
 		/**
 		 * Creates an iterator over the values in a <tt>BufferSortable</tt>.
@@ -535,6 +538,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 		public void set(int first, int last) {
 			this.last = last;
 			this.position = first;
+			this.iteratorAvailable = true;
 		}
 
 		@Override
@@ -564,6 +568,16 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 		public void remove() {
 			throw new UnsupportedOperationException();
 		}
+
+		@Override
+		public Iterator<E> iterator() {
+			if (iteratorAvailable) {
+				iteratorAvailable = false;
+				return this;
+			} else {
+				throw new TraversableOnceException();
+			}
+		}
 	};
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index f575076..ee2a2c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -410,7 +410,7 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 	 * 
 	 * @param output The output view to write the records to.
 	 * @param start The logical start position of the subset.
-	 * @param len The number of elements to write.
+	 * @param num The number of elements to write.
 	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
 	 */
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index a018def..6f75490 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -103,7 +103,7 @@ public interface InMemorySorter<T> extends IndexedSortable {
 	 * 
 	 * @param output The output view to write the records to.
 	 * @param start The logical start position of the subset.
-	 * @param len The number of elements to write.
+	 * @param num The number of elements to write.
 	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
 	 */
 	public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 9ef2ad7..6881cdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -34,24 +34,19 @@ import org.apache.flink.util.MutableObjectIterator;
  * to the next smallest element logarithmic in complexity, with respect to the
  * number of streams to be merged.
  * The order among the elements is established using the methods from the
- * {@link TypeSerializer} class, specifically {@link TypeSerializer#setReference(Object)}
- * and {@link TypeSerializer#compareToReference(TypeSerializer)}.
- * 
- * @see TypeSerializer
- * @see TypeSerializer#setReference(Object)
- * @see TypeSerializer#compareToReference(TypeSerializer)
- * 
+ * {@link TypeComparator} class, specifically {@link TypeComparator#setReference(Object)}
+ * and {@link TypeComparator#compareToReference(TypeComparator)}.
  */
-public class MergeIterator<E> implements MutableObjectIterator<E>
-{
+public class MergeIterator<E> implements MutableObjectIterator<E> {
+	
 	private final PartialOrderPriorityQueue<HeadStream<E>> heap;	// heap over the head elements of the stream
 	
 	private final TypeSerializer<E> serializer;
 	
 	/**
 	 * @param iterators
-	 * @param accessors The accessors used to establish an order among the elements.
-	 *                  The accessors will not be used directly, but a duplicate will be used.
+	 * @param serializer
+	 * @param comparator
 	 * @throws IOException
 	 */
 	public MergeIterator(List<MutableObjectIterator<E>> iterators,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
index 2ed75ae..48693a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
index dcd3361..f6803fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
@@ -16,17 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
-import java.util.Iterator;
+import java.util.Collections;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
-import org.apache.flink.runtime.util.EmptyIterator;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -41,9 +39,9 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
 	
 	private MatchStatus matchStatus;
 	
-	private Iterator<T1> firstReturn;
+	private Iterable<T1> firstReturn;
 	
-	private Iterator<T2> secondReturn;
+	private Iterable<T2> secondReturn;
 	
 	private TypePairComparator<T1, T2> comp;
 	
@@ -73,13 +71,13 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
 
 
 	@Override
-	public Iterator<T1> getValues1() {
+	public Iterable<T1> getValues1() {
 		return this.firstReturn;
 	}
 
 
 	@Override
-	public Iterator<T2> getValues2() {
+	public Iterable<T2> getValues2() {
 		return this.secondReturn;
 	}
 
@@ -117,7 +115,7 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
 		}
 		else if (firstEmpty && !secondEmpty) {
 			// input1 is empty, input2 not
-			this.firstReturn = EmptyIterator.get();
+			this.firstReturn = Collections.emptySet();
 			this.secondReturn = this.iterator2.getValues();
 			this.matchStatus = MatchStatus.FIRST_EMPTY;
 			return true;
@@ -125,7 +123,7 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
 		else if (!firstEmpty && secondEmpty) {
 			// input1 is not empty, input 2 is empty
 			this.firstReturn = this.iterator1.getValues();
-			this.secondReturn = EmptyIterator.get();
+			this.secondReturn = Collections.emptySet();
 			this.matchStatus = MatchStatus.SECOND_EMPTY;
 			return true;
 		}
@@ -142,12 +140,12 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
 			else if (0 < comp) {
 				// key1 goes first
 				this.firstReturn = this.iterator1.getValues();
-				this.secondReturn = EmptyIterator.get();
+				this.secondReturn = Collections.emptySet();
 				this.matchStatus = MatchStatus.SECOND_REMAINED;
 			}
 			else {
 				// key 2 goes first
-				this.firstReturn = EmptyIterator.get();
+				this.firstReturn = Collections.emptySet();
 				this.secondReturn = this.iterator2.getValues();
 				this.matchStatus = MatchStatus.FIRST_REMAINED;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
index a20490f..e137e27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
@@ -20,7 +20,6 @@
 package org.apache.flink.runtime.operators.util;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 
@@ -30,8 +29,8 @@ import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
  * @param <T1> The generic type of the first input's data type.
  * @param <T2> The generic type of the second input's data type.
  */
-public interface CoGroupTaskIterator<T1, T2>
-{	
+public interface CoGroupTaskIterator<T1, T2> {
+	
 	/**
 	 * General-purpose open method.
 	 * 
@@ -63,12 +62,12 @@ public interface CoGroupTaskIterator<T1, T2>
 	 * 
 	 * @return an iterable over the left input values for the current key.
 	 */
-	Iterator<T1> getValues1();
+	Iterable<T1> getValues1();
 
 	/**
 	 * Returns an iterable over the left input values for the current key.
 	 * 
 	 * @return an iterable over the left input values for the current key.
 	 */
-	Iterator<T2> getValues2();
+	Iterable<T2> getValues2();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
index b1628d7..caafaea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
@@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
 /**
  * An empty iterator that never returns anything.
  */
-public final class EmptyIterator<E> implements Iterator<E> {
+public final class EmptyIterator<E> implements Iterator<E>, Iterable<E> {
 
 	/**
 	 * The singleton instance.
@@ -38,9 +38,9 @@ public final class EmptyIterator<E> implements Iterator<E> {
 	 * @param <E> The type of the objects (not) returned by the iterator.
 	 * @return An instance of the iterator.
 	 */
-	public static <E> Iterator<E> get() {
+	public static <E> EmptyIterator<E> get() {
 		@SuppressWarnings("unchecked")
-		Iterator<E> iter = (Iterator<E>) INSTANCE;
+		EmptyIterator<E> iter = (EmptyIterator<E>) INSTANCE;
 		return iter;
 	}
 	
@@ -73,4 +73,9 @@ public final class EmptyIterator<E> implements Iterator<E> {
 	public void remove() {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public Iterator<E> iterator() {
+		return this;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
index 0873baf..be43cc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
@@ -26,6 +26,7 @@ import java.util.NoSuchElementException;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
 
 /**
  * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
@@ -98,6 +99,8 @@ public final class KeyGroupedIterator<E> {
 				return false;
 			}
 		}
+		
+		this.valuesIterator.iteratorAvailable = true;
 
 		// Whole value-iterator was read and a new key is available.
 		if (this.lookAheadHasNext) {
@@ -151,19 +154,20 @@ public final class KeyGroupedIterator<E> {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public final class ValuesIterator implements Iterator<E>
-	{
+	public final class ValuesIterator implements Iterator<E>, Iterable<E> {
+		
 		private final TypeSerializer<E> serializer = KeyGroupedIterator.this.serializer;
 		private final TypeComparator<E> comparator = KeyGroupedIterator.this.comparator; 
 		
 		private E staging = this.serializer.createInstance();
 		private boolean currentIsUnconsumed = false;
 		
+		private boolean iteratorAvailable = true;
+		
 		private ValuesIterator() {}
 
 		@Override
-		public boolean hasNext()
-		{
+		public boolean hasNext() {
 			if (KeyGroupedIterator.this.current == null || KeyGroupedIterator.this.lookAheadHasNext) {
 				return false;
 			}
@@ -221,5 +225,16 @@ public final class KeyGroupedIterator<E> {
 		public void remove() {
 			throw new UnsupportedOperationException();
 		}
+
+		@Override
+		public Iterator<E> iterator() {
+			if (iteratorAvailable) {
+				iteratorAvailable = false;
+				return this;
+			}
+			else {
+				throw new TraversableOnceException();
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
index e707a50..14e4ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
 
 /**
  * This class wraps a {@link MutableObjectIterator} into a regular {@link Iterator}.
@@ -32,13 +33,15 @@ import org.apache.flink.util.MutableObjectIterator;
  * whenever hasNext() returns (possibly with false), the previous obtained record is 
  * still valid and cannot have been overwritten internally.
  */
-public class MutableToRegularIteratorWrapper<T> implements Iterator<T> {
+public class MutableToRegularIteratorWrapper<T> implements Iterator<T>, Iterable<T> {
 	
 	private final MutableObjectIterator<T> source;
 	
 	private T current, next;
 	
 	private boolean currentIsAvailable;
+	
+	private boolean iteratorAvailable = true;
 
 	public MutableToRegularIteratorWrapper(MutableObjectIterator<T> source, TypeSerializer<T> serializer) {
 		this.source = source;
@@ -85,4 +88,15 @@ public class MutableToRegularIteratorWrapper<T> implements Iterator<T> {
 	public void remove() {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public Iterator<T> iterator() {
+		if (iteratorAvailable) {
+			iteratorAvailable = false;
+			return this;
+		}
+		else {
+			throw new TraversableOnceException();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
index 222c5b8..f7eb41a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
@@ -27,7 +27,7 @@ import java.util.NoSuchElementException;
  * 
  * @param<E> The generic type of the iterator.
  */
-public final class SingleElementIterator<E> implements Iterator<E> {
+public final class SingleElementIterator<E> implements Iterator<E>, Iterable<E> {
 		
 		private E current;
 		private boolean available = false;
@@ -61,4 +61,9 @@ public final class SingleElementIterator<E> implements Iterator<E> {
 		public void remove() {
 			throw new UnsupportedOperationException();
 		}
+
+		@Override
+		public Iterator<E> iterator() {
+			return this;
+		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index 3894233..3c8bbd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
@@ -24,9 +23,12 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.BuildFirstCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -40,8 +42,8 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
-{
+public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+
 	private static final long HASH_MEM = 6*1024*1024;
 	
 	private static final long SORT_MEM = 3*1024*1024;
@@ -459,7 +461,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 	
 	// =================================================================================================
 	
-	public static final class MockMatchStub extends JoinFunction {
+	public static final class MockMatchStub extends RichFlatJoinFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
@@ -468,13 +470,13 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		}
 	}
 	
-	public static final class MockFailingMatchStub extends JoinFunction {
+	public static final class MockFailingMatchStub extends RichFlatJoinFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
 		
 		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) {
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
 			if (++this.cnt >= 10) {
 				throw new ExpectedTestException();
 			}
@@ -483,7 +485,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		}
 	}
 	
-	public static final class MockDelayingMatchStub extends JoinFunction {
+	public static final class MockDelayingMatchStub extends RichFlatJoinFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 5551485..b65f161 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
-import java.util.Iterator;
-
 import org.junit.Assert;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
@@ -84,25 +83,23 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Re
 		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
 	}
 	
-	public static final class MockCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
+	public static final class MockCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final Record res = new Record();
 		
+		@SuppressWarnings("unused")
 		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
-		{
+		public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
 			int val1Cnt = 0;
 			int val2Cnt = 0;
 			
-			while (records1.hasNext()) {
+			for (Record r : records1) {
 				val1Cnt++;
-				records1.next();
 			}
 			
-			while (records2.hasNext()) {
+			for (Record r : records2) {
 				val2Cnt++;
-				records2.next();
 			}
 			
 			if (val1Cnt == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index e57d20c..968d947 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase.MockCoGroupStub;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -400,24 +400,20 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
 	}
 	
-	public static class MockFailingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
+	public static class MockFailingCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
 		
 		@Override
-		public void coGroup(Iterator<Record> records1,
-				Iterator<Record> records2, Collector<Record> out) throws RuntimeException
-		{
+		public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
 			int val1Cnt = 0;
 			
-			while (records1.hasNext()) {
+			for (@SuppressWarnings("unused") Record r : records1) { 
 				val1Cnt++;
-				records1.next();
 			}
 			
-			while (records2.hasNext()) {
-				Record record2 = records2.next();
+			for (Record record2 : records2) { 
 				if (val1Cnt == 0) {
 					
 					if(++this.cnt>=10) {
@@ -440,25 +436,23 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 	
 	}
 	
-	public static final class MockDelayingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
+	public static final class MockDelayingCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
+		@SuppressWarnings("unused")
 		@Override
-		public void coGroup(Iterator<Record> records1,
-				Iterator<Record> records2, Collector<Record> out) {
+		public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
 			
-			while (records1.hasNext()) {
+			for (Record r : records1) { 
 				try {
 					Thread.sleep(100);
 				} catch (InterruptedException e) { }
-				records1.next();
 			}
 			
-			while (records2.hasNext()) {
+			for (Record r : records2) { 
 				try {
 					Thread.sleep(100);
 				} catch (InterruptedException e) { }
-				records2.next();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 99440aa..7915d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -16,19 +16,18 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -163,18 +162,18 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 	}
 	
 	@Combinable
-	public static class MockCombiningReduceStub extends ReduceFunction {
+	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue theInteger = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.theInteger);
 				
 				sum += this.theInteger.getValue();
@@ -185,13 +184,13 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		}
 		
 		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
+		public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
 			reduce(records, out);
 		}
 	}
 	
 	@Combinable
-	public static final class MockFailingCombiningReduceStub extends ReduceFunction {
+	public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
@@ -201,12 +200,12 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		private final IntValue combineValue = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.value);
 				
 				sum += this.value.getValue();
@@ -218,12 +217,12 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		}
 		
 		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void combine(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.combineValue);
 				
 				sum += this.combineValue.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
index b7e48c6..786fb4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, Record>> {
 	
 	private static final Log LOG = LogFactory.getLog(MapTaskTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index ebee0b8..8b8e991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.junit.Assert;
@@ -28,10 +26,11 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.GroupReduceDriver;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -213,19 +212,19 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		
 	}
 	
-	public static class MockReduceStub extends ReduceFunction {
+	public static class MockReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
 		private final IntValue value = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int cnt = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				cnt++;
 			}
 			element.getField(0, this.key);
@@ -236,7 +235,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 	}
 	
 	@Combinable
-	public static class MockCombiningReduceStub extends ReduceFunction {
+	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 
 		private final IntValue key = new IntValue();
@@ -244,12 +243,12 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		private final IntValue combineValue = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.value);
 				
 				sum += this.value.getValue();
@@ -261,12 +260,12 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		}
 		
 		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void combine(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.combineValue);
 				
 				sum += this.combineValue.getValue();
@@ -276,7 +275,5 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 			element.setField(1, this.combineValue);
 			out.collect(element);
 		}
-		
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index b367a7e..fada5a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -29,10 +27,11 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.GroupReduceDriver;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -270,19 +269,19 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		
 	}
 	
-	public static class MockReduceStub extends ReduceFunction {
+	public static class MockReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
 		private final IntValue value = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int cnt = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				cnt++;
 			}
 			element.getField(0, this.key);
@@ -293,7 +292,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 	}
 	
 	@Combinable
-	public static class MockCombiningReduceStub extends ReduceFunction {
+	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
@@ -301,12 +300,12 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		private final IntValue combineValue = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.value);
 				
 				sum += this.value.getValue();
@@ -318,12 +317,12 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		}
 		
 		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void combine(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				element.getField(1, this.combineValue);
 				
 				sum += this.combineValue.getValue();
@@ -336,7 +335,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		
 	}
 	
-	public static class MockFailingReduceStub extends ReduceFunction {
+	public static class MockFailingReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
@@ -345,12 +344,12 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		private final IntValue value = new IntValue();
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
 			Record element = null;
 			int valCnt = 0;
-			while (records.hasNext()) {
-				element = records.next();
+			
+			for (Record next : records) {
+				element = next;
 				valCnt++;
 			}
 			
@@ -365,16 +364,15 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		}
 	}
 	
-	public static class MockDelayingReduceStub extends ReduceFunction {
+	public static class MockDelayingReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) {
-			while(records.hasNext()) {
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
+			for (@SuppressWarnings("unused") Record r : records) {
 				try {
 					Thread.sleep(100);
 				} catch (InterruptedException e) {}
-				records.next();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index aabaa03..5609128 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -16,16 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.chaining;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
@@ -48,6 +46,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 
+@SuppressWarnings("deprecation")
 public class ChainTaskTest extends TaskTestBase {
 	
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
@@ -68,7 +67,6 @@ public class ChainTaskTest extends TaskTestBase {
 	}
 	
 	
-	
 	@Test
 	public void testMapTask() {
 		final int keyCnt = 100;
@@ -190,18 +188,19 @@ public class ChainTaskTest extends TaskTestBase {
 		}
 	}
 	
-	public static final class MockFailingCombineStub extends ReduceFunction {
+	public static final class MockFailingCombineStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+		public void reduce(Iterable<Record> records, Collector<Record> out) throws Exception {
 			if (++this.cnt >= 5) {
 				throw new RuntimeException("Expected Test Exception");
 			}
-			while(records.hasNext()) {
-				out.collect(records.next());
+			
+			for (Record r : records) {
+				out.collect(r);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
index d603dec..427dc74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.drivers;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -155,11 +153,10 @@ public class AllGroupReduceDriverTest {
 	public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
 		@Override
-		public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
-			Tuple2<String, Integer> current = values.next();
+		public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
+			Tuple2<String, Integer> current = new Tuple2<String, Integer>("", 0);
 			
-			while (values.hasNext()) {
-				Tuple2<String, Integer> next = values.next();
+			for (Tuple2<String, Integer> next : values) {
 				next.f0 = current.f0 + next.f0;
 				next.f1 = current.f1 + next.f1;
 				current = next;
@@ -172,11 +169,10 @@ public class AllGroupReduceDriverTest {
 	public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
 
 		@Override
-		public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
-			Tuple2<StringValue, IntValue> current = values.next();
+		public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
+			Tuple2<StringValue, IntValue> current = new Tuple2<StringValue, IntValue>(new StringValue(""), new IntValue(0));
 			
-			while (values.hasNext()) {
-				Tuple2<StringValue, IntValue> next = values.next();
+			for (Tuple2<StringValue, IntValue> next : values) {
 				next.f0.append(current.f0);
 				next.f1.setValue(current.f1.getValue() + next.f1.getValue());
 				current = next;