You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/01 02:36:55 UTC

[14/16] [FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from java.util.Iterator) Iterables over transient data throw an TraversableOnceException when iterated over again.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index a325e33..a29d4e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.drivers;
 
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -152,11 +150,10 @@ public class GroupReduceDriverTest {
 	public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
 		@Override
-		public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
-			Tuple2<String, Integer> current = values.next();
+		public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
+			Tuple2<String, Integer> current = new Tuple2<String, Integer>("", 0);
 			
-			while (values.hasNext()) {
-				Tuple2<String, Integer> next = values.next();
+			for (Tuple2<String, Integer> next : values) {
 				next.f0 = current.f0 + next.f0;
 				next.f1 = current.f1 + next.f1;
 				current = next;
@@ -169,11 +166,10 @@ public class GroupReduceDriverTest {
 	public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
 
 		@Override
-		public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
-			Tuple2<StringValue, IntValue> current = values.next();
+		public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
+			Tuple2<StringValue, IntValue> current = new Tuple2<StringValue, IntValue>(new StringValue(""), new IntValue(0));
 			
-			while (values.hasNext()) {
-				Tuple2<StringValue, IntValue> next = values.next();
+			for (Tuple2<StringValue, IntValue> next : values) {
 				next.f0.append(current.f0);
 				next.f1.setValue(current.f1.getValue() + next.f1.getValue());
 				current = next;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 7020f89..4b3c197 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
@@ -26,13 +25,12 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
@@ -273,7 +271,7 @@ public class CombiningUnilateralSortMergerITCase {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public static class TestCountCombiner extends ReduceFunction {
+	public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue count = new IntValue();
@@ -284,11 +282,11 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		
 		@Override
-		public void combine(Iterator<Record> values, Collector<Record> out) {
+		public void combine(Iterable<Record> values, Collector<Record> out) {
 			Record rec = null;
 			int cnt = 0;
-			while (values.hasNext()) {
-				rec = values.next();
+			for (Record next : values) {
+				rec = next;
 				cnt += rec.getField(1, IntValue.class).getValue();
 			}
 			
@@ -298,7 +296,7 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 
 		@Override
-		public void reduce(Iterator<Record> values, Collector<Record> out) {}
+		public void reduce(Iterable<Record> values, Collector<Record> out) {}
 		
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -311,7 +309,7 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 	}
 
-	public static class TestCountCombiner2 extends ReduceFunction {
+	public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		public volatile boolean opened = false;
@@ -319,11 +317,11 @@ public class CombiningUnilateralSortMergerITCase {
 		public volatile boolean closed = false;
 		
 		@Override
-		public void combine(Iterator<Record> values, Collector<Record> out) {
+		public void combine(Iterable<Record> values, Collector<Record> out) {
 			Record rec = null;
 			int cnt = 0;
-			while (values.hasNext()) {
-				rec = values.next();
+			for (Record next : values) {
+				rec = next;
 				cnt += Integer.parseInt(rec.getField(1, TestData.Value.class).toString());
 			}
 
@@ -331,7 +329,7 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 
 		@Override
-		public void reduce(Iterator<Record> values, Collector<Record> out) {
+		public void reduce(Iterable<Record> values, Collector<Record> out) {
 			// yo, nothing, mon
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
index a66fb5d..48a4b91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
@@ -116,8 +116,8 @@ public class SortMergeCoGroupIteratorITCase
 			final TestData.Key key = new TestData.Key();
 			while (iterator.next())
 			{
-				Iterator<Record> iter1 = iterator.getValues1();
-				Iterator<Record> iter2 = iterator.getValues2();
+				Iterator<Record> iter1 = iterator.getValues1().iterator();
+				Iterator<Record> iter2 = iterator.getValues2().iterator();
 				
 				TestData.Value v1 = null;
 				TestData.Value v2 = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
index e9010d2..27fa540 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,17 +39,15 @@ import org.junit.Test;
 /**
  * Test for the key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
  * over the records with the same key.
- * 
  */
-public class KeyGroupedIteratorTest
-{
+public class KeyGroupedIteratorTest {
+	
 	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
 	
 	private KeyGroupedIterator<Record> psi;						// the grouping iterator, progressing in key steps
 	
 	@Before
-	public void setup()
-	{
+	public void setup() {
 		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
 		
 		// add elements to the source
@@ -91,8 +90,7 @@ public class KeyGroupedIteratorTest
 	}
 
 	@Test
-	public void testNextKeyOnly() throws Exception
-	{
+	public void testNextKeyOnly() throws Exception {
 		try {
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
@@ -131,6 +129,8 @@ public class KeyGroupedIteratorTest
 		try {
 			// Key 1, Value A
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -139,6 +139,8 @@ public class KeyGroupedIteratorTest
 			
 			// Key 2, Value B
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -147,6 +149,8 @@ public class KeyGroupedIteratorTest
 			
 			// Key 3, Values C, D
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -173,6 +177,8 @@ public class KeyGroupedIteratorTest
 			
 			// Key 4, Values E, F, G
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -193,6 +199,8 @@ public class KeyGroupedIteratorTest
 			
 			// Key 5, Values H, I, J, K, L
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -231,6 +239,7 @@ public class KeyGroupedIteratorTest
 			
 			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
 			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertNull(this.psi.getValues());
 		} catch (Exception e) {
 			e.printStackTrace();
 			Assert.fail("The test encountered an unexpected exception.");
@@ -244,12 +253,18 @@ public class KeyGroupedIteratorTest
 			// Progression only via nextKey() and hasNext() - Key 1, Value A
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			
 			// Progression only through nextKey() - Key 2, Value B
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			
 			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -260,6 +275,8 @@ public class KeyGroupedIteratorTest
 			
 			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
 			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			
@@ -270,6 +287,8 @@ public class KeyGroupedIteratorTest
 			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
 			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
 			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
 			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
 			
 			// end
@@ -355,4 +374,14 @@ public class KeyGroupedIteratorTest
 			return string;
 		}
 	}
+	
+	public boolean hasIterator(Iterable<?> iterable) {
+		try {
+			iterable.iterator();
+			return true;
+		}
+		catch (TraversableOnceException e) {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
index 6ea8cbf..de7b7d1 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
@@ -19,10 +19,10 @@
 
 package org.apache.flink.api.scala.functions
 
-import java.util.{ Iterator => JIterator }
-
 import scala.Iterator
 
+import java.util.{Iterator => JIterator}
+
 import org.apache.flink.api.scala.analysis.{UDTSerializer, FieldSelector, UDT}
 import org.apache.flink.api.scala.analysis.UDF1
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
index 4c26307..6a71bb7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.scala.operators
 
 import language.experimental.macros
@@ -54,7 +53,7 @@ class CoGroupDataSetWithWhereAndEqual[LeftIn, RightIn](val leftKeySelection: Lis
   def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CoGroupMacros.flatMap[LeftIn, RightIn, Out]
 }
 
-class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends CoGroupOperator.Builder(new UserCodeObjectWrapper(s))
+class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends CoGroupOperator.Builder(new UserCodeObjectWrapper(new CoGroupOperator.WrappingCoGroupFunction(s)))
 
 object CoGroupMacros {
   
@@ -106,7 +105,9 @@ object CoGroupMacros {
       implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
       implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+      
       new CoGroupFunctionBase[LeftIn, RightIn, Out] {
+        
         override def coGroup(leftRecords: JIterator[Record], rightRecords: JIterator[Record], out: Collector[Record]) = {
 
           val firstLeftRecord = leftIterator.initialize(leftRecords)
@@ -177,7 +178,9 @@ object CoGroupMacros {
       implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
       implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+      
       new CoGroupFunctionBase[LeftIn, RightIn, Out] {
+        
         override def coGroup(leftRecords: JIterator[Record], rightRecords: JIterator[Record], out: Collector[Record]) = {
           val firstLeftRecord = leftIterator.initialize(leftRecords)
           val firstRightRecord = rightIterator.initialize(rightRecords)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
index cf3a96c..f25b5a3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.scala.operators
 
 import language.experimental.macros
@@ -191,7 +190,9 @@ object ReduceMacros {
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
 
       new ReduceFunctionBase[In, Out] {
-        override def reduce(records: JIterator[Record], out: Collector[Record]) = {
+        override def reduce(recordsIterable: JIterator[Record], out: Collector[Record]) = {
+          val records: JIterator[Record] = recordsIterable
+          
           if (records.hasNext) {
             val firstRecord = reduceIterator.initialize(records)
             reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index e76f86d..5497004 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -270,6 +270,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		private void reduceInternal(Iterator<Record> records, Collector<Record> out) {
 			Record element = null;
 			int sum = 0;
+			
 			while (records.hasNext()) {
 				element = records.next();
 				IntValue i = element.getField(1, IntValue.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index bf6c37d..197a745 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -124,11 +124,13 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
 		}
 		
 		@Override
-		public void reduce(Iterator<Record> it, Collector<Record> out) {
+		public void reduce(Iterator<Record> records, Collector<Record> out) {
 			// Compute the sum
 			int sum = 0;
-			while (it.hasNext()) {
-				Integer value = Integer.parseInt(it.next().getField(0, StringValue.class).getValue());
+			
+			while (records.hasNext()) {
+				Record r = records.next();
+				Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue());
 				sum += value;
 				testCounter.add(value);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 4b3a4bf..2217679 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.record.io.CsvInputFormat;
+import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
@@ -252,7 +253,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		tailConfig.setOutputSerializer(outputSerializer);
 		
 		// the udf
-		tailConfig.setStubWrapper(new UserCodeObjectWrapper<RecomputeClusterCenter>(new RecomputeClusterCenter()));
+		tailConfig.setStubWrapper(new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new RecomputeClusterCenter())));
 		
 		return tail;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index 35aea4b..b442b33 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
@@ -72,15 +70,13 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
 		}
 
 		@Override
-		public void reduce(Iterator<Integer> records, Collector<Integer> out) {
+		public void reduce(Iterable<Integer> records, Collector<Integer> out) {
 			if (bcValue == null) {
 				return;
 			}
 			final int x = bcValue;
 			
-			while (records.hasNext()) { 
-				int y = records.next();
-
+			for (Integer y : records) { 
 				if (y > x) {
 					out.collect(y);
 					return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 37695b2..f2a43a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.BufferedReader;
@@ -104,7 +103,8 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 			long minimumComponentID = Long.MAX_VALUE;
 
 			while (candidates.hasNext()) {
-				long candidateComponentID = candidates.next().getField(1, LongValue.class).getValue();
+				Record candidate = candidates.next();
+				long candidateComponentID = candidate.getField(1, LongValue.class).getValue();
 				if (candidateComponentID < minimumComponentID) {
 					minimumComponentID = candidateComponentID;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 49bdb26..c6175b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichCoGroupFunction;
@@ -116,17 +114,17 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 	public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		
 		@Override
-		public void coGroup(Iterator<Tuple2<Long, Long>> candidates, Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
-			if (!current.hasNext()) {
+		public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
+			if (!current.iterator().hasNext()) {
 				throw new RuntimeException("Error: Id not encountered before.");
 			}
 			
-			Tuple2<Long, Long> old = current.next();
+			Tuple2<Long, Long> old = current.iterator().next();
 			
 			long minimumComponentID = Long.MAX_VALUE;
 
-			while (candidates.hasNext()) {
-				long candidateComponentID = candidates.next().f1;
+			for (Tuple2<Long, Long> candidate : candidates) {
+				long candidateComponentID = candidate.f1;
 				if (candidateComponentID < minimumComponentID) {
 					minimumComponentID = candidateComponentID;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index d2669aa..fdfb321 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
@@ -170,8 +168,8 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<Long> values, Collector<Long> out) throws Exception {
-				out.collect(values.next());
+		public void reduce(Iterable<Long> values, Collector<Long> out) {
+				out.collect(values.iterator().next());
 		}
 	}
 	
@@ -204,21 +202,19 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
 		
 		@Override
-		public void reduce(Iterator<Tuple2<Long, Long>> values,
-				Collector<Tuple2<Long, Long>> out) throws Exception {
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+			Long vertexId = 0L;
+			Long minimumCompId = Long.MAX_VALUE;
 
-			final Tuple2<Long, Long> first = values.next();		
-			final Long vertexId = first.f0;
-			Long minimumCompId = first.f1;
-			
-			while ( values.hasNext() ) {
-				Long candidateCompId = values.next().f1;
-				if ( candidateCompId < minimumCompId ) {
+			for (Tuple2<Long, Long> value: values) {
+				vertexId = value.f0;
+				Long candidateCompId = value.f1;
+				if (candidateCompId < minimumCompId) {
 					minimumCompId = candidateCompId;
 				}
 			}
-			resultVertex.setField(vertexId, 0);
-			resultVertex.setField(minimumCompId, 1);
+			resultVertex.f0 = vertexId;
+			resultVertex.f1 = minimumCompId;
 
 			out.collect(resultVertex);
 		}
@@ -231,8 +227,8 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		@Override
 		public void flatMap(
 				Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId,
-				Collector<Tuple2<Long, Long>> out) throws Exception {
-						
+				Collector<Tuple2<Long, Long>> out)
+		{
 			if ( vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1 ) {
 				out.collect(vertexWithNewAndOldId.f0);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index cb25019..3dc0bdf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index 6a487c4..2c777f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;
@@ -110,6 +109,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
 		public void reduce(Iterator<Record> it, Collector<Record> out) {
 			// Compute the sum
 			int sum = 0;
+			
 			while (it.hasNext()) {
 				sum += Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index c81b32a..b3145dc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index b62d85a..d00602f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -44,85 +44,81 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class IterationWithChainingITCase extends RecordAPITestBase {
 
-    private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
+	private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
 
-    private String dataPath;
-    private String resultPath;
+	private String dataPath;
+	private String resultPath;
 
-    public IterationWithChainingITCase(Configuration config) {
-        super(config);
+	public IterationWithChainingITCase(Configuration config) {
+		super(config);
 		setTaskManagerNumSlots(DOP);
-    }
+	}
 
-    @Override
-    protected void preSubmit() throws Exception {
-        dataPath = createTempFile("data_points.txt", DATA_POINTS);
-        resultPath = getTempFilePath("result");
-    }
+	@Override
+	protected void preSubmit() throws Exception {
+		dataPath = createTempFile("data_points.txt", DATA_POINTS);
+		resultPath = getTempFilePath("result");
+	}
 
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(DATA_POINTS, resultPath);
-    }
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(DATA_POINTS, resultPath);
+	}
 
+	@Override
+	protected Plan getTestJob() {
+		Plan plan = getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath);
+		return plan;
+	}
 
-    @Override
-    protected Plan getTestJob() {
-        Plan plan = getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath);
-        return plan;
-    }
+	@Parameters
+	public static Collection<Object[]> getConfigurations() {
+		Configuration config1 = new Configuration();
+		config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
+		return toParameterList(config1);
+	}
 
-    @Parameters
-    public static Collection<Object[]> getConfigurations() {
-        Configuration config1 = new Configuration();
-        config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
-        return toParameterList(config1);
-    }
+	public static final class IdentityMapper extends MapFunction implements Serializable {
 
-    public static final class IdentityMapper extends MapFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
 
-        private static final long serialVersionUID = 1L;
+		@Override
+		public void map(Record rec, Collector<Record> out) {
+			out.collect(rec);
+		}
+	}
 
-        @Override
-        public void map(Record rec, Collector<Record> out) {
-            out.collect(rec);
-        }
-    }
+	public static final class DummyReducer extends ReduceFunction implements Serializable {
 
-    public static final class DummyReducer extends ReduceFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
 
-        private static final long serialVersionUID = 1L;
+		@Override
+		public void reduce(Iterator<Record> it, Collector<Record> out) {
+			while (it.hasNext()) {
+				out.collect(it.next());
+			}
+		}
+	}
 
-        @Override
-        public void reduce(Iterator<Record> it, Collector<Record> out) {
-            while (it.hasNext()) {
-                out.collect(it.next());
-            }
-        }
-    }
+	static Plan getTestPlan(int numSubTasks, String input, String output) {
 
-    static Plan getTestPlan(int numSubTasks, String input, String output) {
+		FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
+		initialInput.setDegreeOfParallelism(1);
 
-        FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
-        initialInput.setDegreeOfParallelism(1);
+		BulkIteration iteration = new BulkIteration("Loop");
+		iteration.setInput(initialInput);
+		iteration.setMaximumNumberOfIterations(2);
 
-        BulkIteration iteration = new BulkIteration("Loop");
-        iteration.setInput(initialInput);
-        iteration.setMaximumNumberOfIterations(2);
+		ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution())
+				.name("Reduce something").build();
 
-        ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0)
-                .input(iteration.getPartialSolution())
-                .name("Reduce something")
-                .build();
+		MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build();
+		iteration.setNextPartialSolution(dummyMap);
 
+		FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
 
-        MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build();
-        iteration.setNextPartialSolution(dummyMap);
-
-        FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
-
-        Plan plan = new Plan(finalResult, "Iteration with chained map test");
-        plan.setDefaultParallelism(numSubTasks);
-        return plan;
-    }
+		Plan plan = new Plan(finalResult, "Iteration with chained map test");
+		plan.setDefaultParallelism(numSubTasks);
+		return plan;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index aa83e9b..2a130ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -16,43 +16,28 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.IterativeDataSet;
 import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
 import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
-@RunWith(Parameterized.class)
-public class IterationWithUnionITCase extends RecordAPITestBase {
+public class IterationWithUnionITCase extends JavaProgramTestBase {
 
 	private static final String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
 
 	protected String dataPath;
 	protected String resultPath;
 
-	
-	public IterationWithUnionITCase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(DOP);
-	}
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -66,54 +51,36 @@ public class IterationWithUnionITCase extends RecordAPITestBase {
 	}
 
 	@Override
-	protected Plan getTestJob() {
-		return getPlan(config.getInteger("IterationWithUnionITCase#NumSubtasks", 1), dataPath, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Configuration config1 = new Configuration();
-		config1.setInteger("IterationWithUnionITCase#NumSubtasks", DOP);
-
-		return toParameterList(config1);
-	}
-	
-	private static Plan getPlan(int numSubTasks, String input, String output) {
-		FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
-		initialInput.setDegreeOfParallelism(1);
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(initialInput);
-		iteration.setMaximumNumberOfIterations(2);
-
-		@SuppressWarnings("unchecked")
-		MapOperator map2 = MapOperator.builder(new IdentityMapper()).input(iteration.getPartialSolution(), iteration.getPartialSolution()).name("map").build();
+		DataSet<Record> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
 		
-		iteration.setNextPartialSolution(map2);
-
-		FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
-
-		Plan plan = new Plan(finalResult, "Iteration with union test");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
+		IterativeDataSet<Record> iteration = initialInput.iterate(2);
+		
+		DataSet<Record> result = iteration.union(iteration).map(new IdentityMapper());
+		
+		iteration.closeWith(result).write(new PointOutFormat(), this.resultPath);
+		
+		env.execute();
 	}
 	
-	static final class IdentityMapper extends MapFunction implements Serializable {
+	static final class IdentityMapper implements MapFunction<Record, Record>, Serializable {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void map(Record rec, Collector<Record> out) {
-			out.collect(rec);
+		public Record map(Record rec) {
+			return rec;
 		}
 	}
 
-	static final class DummyReducer extends ReduceFunction implements Serializable {
+	static final class DummyReducer implements GroupReduceFunction<Record, Record>, Serializable {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<Record> it, Collector<Record> out) {
-			while (it.hasNext()) {
-				out.collect(it.next());
+		public void reduce(Iterable<Record> it, Collector<Record> out) {
+			for (Record r : it) {
+				out.collect(r);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index 4c8177a..9382708 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -16,34 +16,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.aggregators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-
 import org.junit.Assert;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 
-
 /**
- * 
- * Connected Components test case that uses a parametrizable aggregator
- *
+ * Connected Components test case that uses a parameterizable aggregator
  */
 public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaProgramTestBase {
 
@@ -147,7 +140,8 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
 		}
 	}
 
-	public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class NeighborWithComponentIDJoin implements JoinFunction
+		<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -163,24 +157,23 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
 	public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
-		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
+		
+		private final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
 
 		@Override
-		public void reduce(Iterator<Tuple2<Long, Long>> values,
-				Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			final Tuple2<Long, Long> first = values.next();
-			final Long vertexId = first.f0;
-			Long minimumCompId = first.f1;
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+			Long vertexId = 0L;
+			Long minimumCompId = Long.MAX_VALUE;
 
-			while (values.hasNext()) {
-				Long candidateCompId = values.next().f1;
+			for (Tuple2<Long, Long> value: values) {
+				vertexId = value.f0;
+				Long candidateCompId = value.f1;
 				if (candidateCompId < minimumCompId) {
 					minimumCompId = candidateCompId;
 				}
 			}
-			resultVertex.setField(vertexId, 0);
-			resultVertex.setField(minimumCompId, 1);
+			resultVertex.f0 = vertexId;
+			resultVertex.f1 = minimumCompId;
 
 			out.collect(resultVertex);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index 104c3df..039d64e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.aggregators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -158,21 +156,19 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
 		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
 
 		@Override
-		public void reduce(Iterator<Tuple2<Long, Long>> values,
-				Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			final Tuple2<Long, Long> first = values.next();		
-			final Long vertexId = first.f0;
-			Long minimumCompId = first.f1;
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+			Long vertexId = 0L;
+			Long minimumCompId = Long.MAX_VALUE;
 
-			while (values.hasNext()) {
-				Long candidateCompId = values.next().f1;
+			for (Tuple2<Long, Long> value: values) {
+				vertexId = value.f0;
+				Long candidateCompId = value.f1;
 				if (candidateCompId < minimumCompId) {
 					minimumCompId = candidateCompId;
 				}
 			}
-			resultVertex.setField(vertexId, 0);
-			resultVertex.setField(minimumCompId, 1);
+			resultVertex.f0 = vertexId;
+			resultVertex.f1 = minimumCompId;
 
 			out.collect(resultVertex);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 3060b68..4fd22a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -31,6 +32,8 @@ import org.apache.flink.api.java.record.functions.MapFunction;
 import org.apache.flink.api.java.record.io.CsvInputFormat;
 import org.apache.flink.api.java.record.io.CsvOutputFormat;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
+import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
+import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingClassReduceFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
@@ -317,7 +320,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 			intermediateConfig.setDriverComparator(comparator, 0);
 			intermediateConfig.setStubWrapper(
-				new UserCodeClassWrapper<MinimumComponentIDReduce>(MinimumComponentIDReduce.class));
+				new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingClassReduceFunction(MinimumComponentIDReduce.class)));
 		}
 
 		return intermediate;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 448b7bd..5a6e4f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -19,13 +19,12 @@
 package org.apache.flink.test.iterative.nephele;
 
 import java.util.Collection;
-import java.util.Iterator;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
@@ -273,14 +272,14 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		}
 	}
 
-	public static final class DummyReducer extends ReduceFunction {
+	public static final class DummyReducer implements GroupReduceFunction<Record, Record> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<Record> it, Collector<Record> out) {
-			while (it.hasNext()) {
-				out.collect(it.next());
+		public void reduce(Iterable<Record> it, Collector<Record> out) {
+			for (Record r :it) {
+				out.collect(r);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
index 1ec0eb4..cd6a89d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import java.util.Iterator;
@@ -83,11 +82,13 @@ public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction i
 	}
 
 	@Override
-	public void coGroup(Iterator<VertexWithRankAndDangling> currentPageRankIterator, Iterator<VertexWithRank> partialRanks,
+	public void coGroup(Iterable<VertexWithRankAndDangling> currentPageRankIterable, Iterable<VertexWithRank> partialRanks,
 			Collector<VertexWithRankAndDangling> collector)
 	{
+		final Iterator<VertexWithRankAndDangling> currentPageRankIterator = currentPageRankIterable.iterator();
+		
 		if (!currentPageRankIterator.hasNext()) {
-			long missingVertex = partialRanks.next().getVertexID();
+			long missingVertex = partialRanks.iterator().next().getVertexID();
 			throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
 		}
 
@@ -95,8 +96,8 @@ public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction i
 
 		long edges = 0;
 		double summedRank = 0;
-		while (partialRanks.hasNext()) {
-			summedRank += partialRanks.next().getRank();
+		for (VertexWithRank pr :partialRanks) {
+			summedRank += pr.getRank();
 			edges++;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
index d83b33b..8d92c59 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
@@ -29,6 +29,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("deprecation")
 public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 1e08a9f..8ec8403 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -35,12 +35,14 @@ public class CustomRankCombiner extends AbstractRichFunction implements GroupRed
 	private final VertexWithRank accumulator = new VertexWithRank();
 	
 	@Override
-	public void reduce(Iterator<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
+	public void reduce(Iterable<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public void combine(Iterator<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
+	public void combine(Iterable<VertexWithRank> recordsIterable, Collector<VertexWithRank> out) throws Exception {
+		final Iterator<VertexWithRank> records = recordsIterable.iterator();
+		
 		VertexWithRank next = records.next();
 		this.accumulator.setVertexID(next.getVertexID());
 		double rank = next.getRank();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
index ab3dea9..c51cf96 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele.danglingpagerank;
 
 import java.util.Iterator;
@@ -102,7 +101,8 @@ public class CompensatableDotProductCoGroup extends CoGroupFunction {
 		long edges = 0;
 		double summedRank = 0;
 		while (partialRanks.hasNext()) {
-			summedRank += partialRanks.next().getField(1, doubleInstance).getValue();
+			Record pr = partialRanks.next();
+			summedRank += pr.getField(1, doubleInstance).getValue();
 			edges++;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 3749c1d..d59d721 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.javaApiOperators;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -308,21 +307,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
 		@Override
 		public void coGroup(
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<Tuple2<Integer, Integer>> out) throws Exception {
-			
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<Tuple2<Integer, Integer>> out)
+		{
 			int sum = 0;
 			int id = 0;
 			
-			while(first.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = first.next();
+			for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) {
 				sum += element.f2;
 				id = element.f0;
 			}
 			
-			while(second.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = second.next();
+			for ( Tuple5<Integer, Long, Integer, String, Long> element : second ) {
 				sum += element.f2;
 				id = element.f0;
 			}
@@ -336,27 +333,22 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void coGroup(Iterator<CustomType> first,
-				Iterator<CustomType> second, Collector<CustomType> out)
-				throws Exception {
+		public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) {
 			
 			CustomType o = new CustomType(0,0,"test");
 			
-			while(first.hasNext()) {
-				CustomType element = first.next();
+			for ( CustomType element : first ) {
 				o.myInt = element.myInt;
 				o.myLong += element.myLong;
 			}
 			
-			while(second.hasNext()) {
-				CustomType element = second.next();
+			for ( CustomType element : second ) {
 				o.myInt = element.myInt;
 				o.myLong += element.myLong;
 			}
 			
 			out.collect(o);
 		}
-		
 	}
 	
 	public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
@@ -365,21 +357,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
 		@Override
 		public void coGroup(
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterator<CustomType> second,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<CustomType> second,
 				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
 			
 			long sum = 0;
 			int id = 0;
 			
-			while(first.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = first.next();
+			for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) {
 				sum += element.f0;
 				id = element.f2;
 			}
 			
-			while(second.hasNext()) {
-				CustomType element = second.next();
+			for (CustomType element : second) {
 				id = element.myInt;
 				sum += element.myLong;
 			}
@@ -394,20 +384,18 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void coGroup(Iterator<CustomType> first,
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<CustomType> out) throws Exception {
-			
+		public void coGroup(Iterable<CustomType> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<CustomType> out)
+		{
 			CustomType o = new CustomType(0,0,"test");
 			
-			while(first.hasNext()) {
-				CustomType element = first.next();
+			for (CustomType element : first) {
 				o.myInt = element.myInt;
 				o.myLong += element.myLong;
 			}
 			
-			while(second.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = second.next();
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
 				o.myInt = element.f2;
 				o.myLong += element.f0;
 			}
@@ -423,14 +411,14 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void coGroup(Iterator<Tuple3<Integer, Long, String>> first,
-				Iterator<Tuple3<Integer, Long, String>> second,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-
-			while(first.hasNext()) {
-				Tuple3<Integer, Long, String> element = first.next();
-				if(element.f0 < 6)
+		public void coGroup(Iterable<Tuple3<Integer, Long, String>> first,
+				Iterable<Tuple3<Integer, Long, String>> second,
+				Collector<Tuple3<Integer, Long, String>> out)
+		{
+			for (Tuple3<Integer, Long, String> element : first) {
+				if(element.f0 < 6) {
 					out.collect(element);
+				}
 			}
 		}
 	}
@@ -441,20 +429,16 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
 		@Override
 		public void coGroup(
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
 				Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
-				throws Exception {
-
-			while(second.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = second.next();
-				if(element.f0 < 4)
+		{
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+				if(element.f0 < 4) {
 					out.collect(element);
+				}
 			}
-			
 		}
-
-
 	}
 	
 	public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
@@ -477,21 +461,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
 		@Override
 		public void coGroup(
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
-			
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<Tuple3<Integer, Integer, Integer>> out)
+		{
 			int sum = 0;
 			int id = 0;
 			
-			while(first.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = first.next();
+			for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
 				sum += element.f2;
 				id = element.f0;
 			}
 			
-			while(second.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> element = second.next();
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
 				sum += element.f2;
 				id = element.f0;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 7376e86..bd10c5e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -422,16 +422,13 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 	public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
 		private static final long serialVersionUID = 1L;
 
-
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
-				Collector<Tuple2<Integer, Long>> out) throws Exception {
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) {
 			
 			int i = 0;
 			long l = 0l;
 			
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for (Tuple3<Integer, Long, String> t : values) {
 				i += t.f0;
 				l = t.f1;
 			}
@@ -446,24 +443,22 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 
 
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			
-			Tuple3<Integer, Long, String> t = values.next();
-			
-			int sum = t.f0;
-			long key = t.f1;
-			String concat = t.f2;
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
+			int sum = 0;
+			long key = 0;
+			StringBuilder concat = new StringBuilder();
 			
-			while(values.hasNext()) {
-				t = values.next();
-				
-				sum += t.f0;
-				concat += "-"+t.f2;
+			for (Tuple3<Integer, Long, String> next : values) {
+				sum += next.f0;
+				key = next.f1;
+				concat.append(next.f2).append("-");
 			}
 			
-			out.collect(new Tuple3<Integer, Long, String>(sum, key, concat));
+			if (concat.length() > 0) {
+				concat.setLength(concat.length() - 1);
+			}
 			
+			out.collect(new Tuple3<Integer, Long, String>(sum, key, concat.toString()));
 		}
 	}
 	
@@ -472,16 +467,14 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 
 		@Override
 		public void reduce(
-				Iterator<Tuple5<Integer, Long, Integer, String, Long>> values,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> values,
 				Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
-				throws Exception {
-			
+		{
 			int i = 0;
 			long l = 0l;
 			long l2 = 0l;
 			
-			while(values.hasNext()) {
-				Tuple5<Integer, Long, Integer, String, Long> t = values.next();
+			for ( Tuple5<Integer, Long, Integer, String, Long> t : values ) {
 				i = t.f0;
 				l += t.f1;
 				l2 = t.f4;
@@ -496,20 +489,19 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		
 
 		@Override
-		public void reduce(Iterator<CustomType> values,
-				Collector<CustomType> out) throws Exception {
+		public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
+			final Iterator<CustomType> iter = values.iterator();
 			
 			CustomType o = new CustomType();
-			CustomType c = values.next();
+			CustomType c = iter.next();
 			
 			o.myString = "Hello!";
 			o.myInt = c.myInt;
 			o.myLong = c.myLong;
 			
-			while(values.hasNext()) {
-				c = values.next();
-				o.myLong += c.myLong;
-
+			while (iter.hasNext()) {
+				CustomType next = iter.next();
+				o.myLong += next.myLong;
 			}
 			
 			out.collect(o);
@@ -522,11 +514,9 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				
 				if(t.f0 < 4) {
 					t.f2 = "Hi!";
@@ -544,14 +534,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 
 			int i = 0;
 			long l = 0l;
 			
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0;
 				l += t.f1;
 			}
@@ -564,21 +552,13 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void reduce(Iterator<CustomType> values,
-				Collector<CustomType> out) throws Exception {
+		public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
 
-			CustomType o = new CustomType();
-			CustomType c = values.next();
-			
-			o.myString = "Hello!";
-			o.myInt = c.myInt;
-			o.myLong = c.myLong;
-			
+			CustomType o = new CustomType(0, 0, "Hello!");
 			
-			while(values.hasNext()) {
-				c = values.next();
-				o.myInt += c.myInt;
-				o.myLong += c.myLong;
+			for (CustomType next : values) {
+				o.myInt += next.myInt;
+				o.myLong += next.myLong;
 			}
 			
 			out.collect(o);
@@ -602,14 +582,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 				
 			int i = 0;
 			long l = 0l;
 			
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0;
 				l = t.f1;
 			}
@@ -624,12 +602,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 
 			Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, "");
 
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				o.f0 += t.f0;
 				o.f1 = t.f1;
 				o.f2 = "test"+o.f1;
@@ -639,14 +616,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
-				Collector<Tuple2<Integer, String>> out) throws Exception {
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
 
 			int i = 0;
 			String s = "";
 
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0;
 				s = t.f2;
 			}
@@ -661,12 +636,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
+		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 			
 			Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, "");
 			
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				o.f0 += t.f0;
 				o.f1 += t.f1;
 				o.f2 += "test";
@@ -676,13 +650,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 
 		@Override
-		public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
+		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
 			
 			int i = 0;
 			String s = "";
 			
-			while(values.hasNext()) {
-				Tuple3<Integer, Long, String> t = values.next();
+			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0 + t.f1;
 				s += t.f2;
 			}
@@ -697,12 +670,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void combine(Iterator<CustomType> values, Collector<CustomType> out) throws Exception {
+		public void combine(Iterable<CustomType> values, Collector<CustomType> out) throws Exception {
 			
 			CustomType o = new CustomType();
 			
-			while(values.hasNext()) {
-				CustomType c = values.next();
+			for ( CustomType c : values ) {
 				o.myInt = c.myInt;
 				o.myLong += c.myLong;
 				o.myString = "test"+c.myInt;
@@ -712,13 +684,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 
 		@Override
-		public void reduce(Iterator<CustomType> values,
-				Collector<CustomType> out) throws Exception {
+		public void reduce(Iterable<CustomType> values, Collector<CustomType> out)  {
 			
 			CustomType o = new CustomType(0, 0, "");
 			
-			while(values.hasNext()) {
-				CustomType c = values.next();
+			for ( CustomType c : values) {
 				o.myInt = c.myInt;
 				o.myLong += c.myLong;
 				o.myString = c.myString;
@@ -733,6 +703,5 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 
 		@Override
 		public T map(T value) { return value; }
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 5a895d3..865c550 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.operators;
 
 import java.io.FileNotFoundException;
@@ -26,8 +25,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.functions.CoGroupFunction;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -46,13 +43,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-/**
- */
 @RunWith(Parameterized.class)
 public class CoGroupITCase extends RecordAPITestBase {
 
-	private static final Log LOG = LogFactory.getLog(CoGroupITCase.class);
-
 	String leftInPath = null;
 	String rightInPath = null;
 	String resultPath = null;
@@ -89,8 +82,6 @@ public class CoGroupITCase extends RecordAPITestBase {
 			target.setField(0, keyString);
 			target.setField(1, valueString);
 
-			LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]");
-
 			return target;
 		}
 
@@ -112,9 +103,6 @@ public class CoGroupITCase extends RecordAPITestBase {
 			this.buffer.append('\n');
 			
 			byte[] bytes = this.buffer.toString().getBytes();
-
-			LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]");
-
 			this.stream.write(bytes);
 		}
 	}
@@ -124,33 +112,28 @@ public class CoGroupITCase extends RecordAPITestBase {
 
 		private StringValue keyString = new StringValue();
 		private StringValue valueString = new StringValue();
-		private Record record = new Record();
 		
 		@Override
 		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-			// TODO Auto-generated method stub
 			
+			Record record = null;
 			int sum = 0;
-			LOG.debug("Start iterating over input1");
+			
 			while (records1.hasNext()) {
 				record = records1.next();
 				keyString = record.getField(0, keyString);
 				valueString = record.getField(1, valueString);
 				sum += Integer.parseInt(valueString.getValue());
-
-				LOG.debug("Processed: [" + keyString.getValue() + "," + valueString.getValue() + "]");
 			}
-			LOG.debug("Start iterating over input2");
+			
+			
 			while (records2.hasNext()) {
 				record = records2.next();
 				keyString = record.getField(0, keyString);
 				valueString = record.getField(1, valueString);
 				sum -= Integer.parseInt(valueString.getValue());
-
-				LOG.debug("Processed: [" + keyString.getValue() + "," + valueString.getValue() + "]");
 			}
 			record.setField(1, new IntValue(sum));
-			LOG.debug("Finished");
 			
 			out.collect(record);
 		}
@@ -197,9 +180,9 @@ public class CoGroupITCase extends RecordAPITestBase {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 
-		String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE};
+		String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
 
-		String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, };
+		String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
 
 		for (String localStrategy : localStrategies) {
 			for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index b06cf47..2711417 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.operators;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -53,8 +50,6 @@ import java.util.LinkedList;
 
 @RunWith(Parameterized.class)
 public class ReduceITCase extends RecordAPITestBase {
-	
-	private static final Log LOG = LogFactory.getLog(ReduceITCase.class);
 
 	String inPath = null;
 	String resultPath = null;
@@ -83,17 +78,14 @@ public class ReduceITCase extends RecordAPITestBase {
 		private StringValue combineValue = new StringValue();
 
 		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-		
+		public void combine(Iterator<Record> records, Collector<Record> out) {
+			Record record = null;
 			int sum = 0;
-			Record record = new Record();
+			
 			while (records.hasNext()) {
 				record = records.next();
 				combineValue = record.getField(1, combineValue);
 				sum += Integer.parseInt(combineValue.toString());
-
-				LOG.debug("Processed: [" + record.getField(0, StringValue.class).toString() +
-						"," + combineValue.toString() + "]");
 			}
 			combineValue.setValue(sum + "");
 			record.setField(1, combineValue);
@@ -101,17 +93,14 @@ public class ReduceITCase extends RecordAPITestBase {
 		}
 
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-		
+		public void reduce(Iterator<Record> records, Collector<Record> out) {
+			Record record = null;
 			int sum = 0;
-			Record record = new Record();
+			
 			while (records.hasNext()) {
 				record = records.next();
 				reduceValue = record.getField(1, reduceValue);
 				sum += Integer.parseInt(reduceValue.toString());
-
-				LOG.debug("Processed: [" + record.getField(0, StringValue.class).toString() +
-						"," + reduceValue.toString() + "]");
 			}
 			record.setField(1, new IntValue(sum));
 			out.collect(record);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
index 60b7512..0b84309 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.recordJobTests;
 
 import java.io.Serializable;