You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:37 UTC

[21/22] git commit: [FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from java.util.Iterator) Iterables over transient data throw an TraversableOnceException when iterated over again.

[FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from java.util.Iterator)
Iterables over transient data throw an TraversableOnceException when iterated over again.

This closes #84


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/72d7b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/72d7b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/72d7b862

Branch: refs/heads/release-0.6
Commit: 72d7b86274c33d1570ffb22b1fca2081c15d753c
Parents: 934e4e0
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 29 15:58:44 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 1 02:33:49 2014 +0200

----------------------------------------------------------------------
 .../api/avro/AvroWithEmptyArrayITCase.java      |   7 +-
 .../mapred/record/example/WordCount.java        |   2 +-
 .../example/WordCountWithOutputFormat.java      |   2 +-
 .../spargel/java/VertexCentricIteration.java    |  42 ++--
 .../spargel/java/record/SpargelIteration.java   |   2 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |   7 +-
 .../compiler/GroupReduceCompilationTest.java    |  15 +-
 .../flink/compiler/IterationsCompilerTest.java  |   5 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |  10 +-
 .../testfunctions/IdentityGroupReducer.java     |   8 +-
 .../testfunctions/Top1GroupReducer.java         |   7 +-
 .../flink/compiler/util/DummyCoGroupStub.java   |   1 -
 .../flink/compiler/util/IdentityReduce.java     |   1 -
 .../api/common/functions/CoGroupFunction.java   |   9 +-
 .../api/common/functions/CombineFunction.java   |   3 +-
 .../common/functions/FlatCombineFunction.java   |   3 +-
 .../common/functions/GroupReduceFunction.java   |   3 +-
 .../flink/util/TraversableOnceException.java    |  28 +++
 .../example/java/graph/EnumTrianglesBasic.java  |  10 +-
 .../example/java/graph/EnumTrianglesOpt.java    |  12 +-
 .../flink/example/java/graph/PageRankBasic.java |   6 +-
 .../java/graph/TransitiveClosureNaive.java      |   6 +-
 .../example/java/relational/WebLogAnalysis.java |  10 +-
 .../flink/api/java/ExecutionEnvironment.java    |  21 ++
 .../api/java/functions/GroupReduceIterator.java |   4 +-
 .../api/java/functions/RichCoGroupFunction.java |   4 +-
 .../java/functions/RichFlatCombineFunction.java |   4 +-
 .../java/functions/RichGroupReduceFunction.java |   9 +-
 .../api/java/operators/AggregateOperator.java   |   3 +-
 .../api/java/operators/CoGroupOperator.java     |  10 +-
 .../api/java/operators/DistinctOperator.java    |   6 +-
 .../api/java/operators/GroupReduceOperator.java |   5 -
 .../PlanUnwrappingCoGroupOperator.java          |  44 +---
 .../translation/PlanUnwrappingJoinOperator.java |   8 -
 .../PlanUnwrappingReduceGroupOperator.java      |  14 +-
 .../translation/TupleUnwrappingIterator.java    |  15 +-
 .../operators/translation/WrappingFunction.java |   3 +-
 .../java/record/functions/CoGroupFunction.java  |   4 +-
 .../java/record/functions/ReduceFunction.java   |   7 +-
 .../java/record/operators/CoGroupOperator.java  |  61 ++++-
 .../java/record/operators/ReduceOperator.java   | 107 ++++++--
 .../DeltaIterationTranslationTest.java          |   4 +-
 .../record/CoGroupWrappingFunctionTest.java     | 221 +++++++++++++++++
 .../java/record/ReduceWrappingFunctionTest.java | 246 +++++++++++++++++++
 .../java/type/extractor/TypeExtractorTest.java  |   5 +-
 .../javaApiOperators/lambdas/CoGroupITCase.java |  11 +-
 .../lambdas/GroupReduceITCase.java              |   8 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |   6 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |   5 +-
 .../runtime/operators/RegularPactTask.java      |   1 -
 .../sort/CombiningUnilateralSortMerger.java     |  16 +-
 .../operators/sort/FixedLengthRecordSorter.java |   2 +-
 .../runtime/operators/sort/InMemorySorter.java  |   2 +-
 .../runtime/operators/sort/MergeIterator.java   |  17 +-
 .../operators/sort/MergeMatchIterator.java      |   1 -
 .../sort/SortMergeCoGroupIterator.java          |  20 +-
 .../operators/util/CoGroupTaskIterator.java     |   9 +-
 .../flink/runtime/util/EmptyIterator.java       |  11 +-
 .../flink/runtime/util/KeyGroupedIterator.java  |  23 +-
 .../util/MutableToRegularIteratorWrapper.java   |  16 +-
 .../runtime/util/SingleElementIterator.java     |   7 +-
 .../runtime/operators/CachedMatchTaskTest.java  |  18 +-
 .../operators/CoGroupTaskExternalITCase.java    |  19 +-
 .../runtime/operators/CoGroupTaskTest.java      |  30 +--
 .../runtime/operators/CombineTaskTest.java      |  35 ++-
 .../flink/runtime/operators/MapTaskTest.java    |   1 +
 .../operators/ReduceTaskExternalITCase.java     |  35 ++-
 .../flink/runtime/operators/ReduceTaskTest.java |  50 ++--
 .../operators/chaining/ChainTaskTest.java       |  15 +-
 .../drivers/AllGroupReduceDriverTest.java       |  16 +-
 .../drivers/GroupReduceDriverTest.java          |  16 +-
 .../CombiningUnilateralSortMergerITCase.java    |  24 +-
 .../sort/SortMergeCoGroupIteratorITCase.java    |   4 +-
 .../runtime/util/KeyGroupedIteratorTest.java    |  43 +++-
 .../api/scala/functions/ReduceFunction.scala    |   4 +-
 .../api/scala/operators/CoGroupOperator.scala   |   7 +-
 .../api/scala/operators/ReduceOperator.scala    |   5 +-
 .../test/accumulators/AccumulatorITCase.java    |   1 +
 .../AccumulatorIterativeITCase.java             |   8 +-
 .../KMeansIterativeNepheleITCase.java           |   3 +-
 .../BulkIterationWithAllReducerITCase.java      |   8 +-
 .../CoGroupConnectedComponentsITCase.java       |   4 +-
 .../CoGroupConnectedComponentsSecondITCase.java |  12 +-
 .../DependencyConnectedComponentsITCase.java    |  30 +--
 ...IterationTerminationWithTerminationTail.java |   1 -
 .../IterationTerminationWithTwoTails.java       |   2 +-
 .../IterationWithAllReducerITCase.java          |   1 -
 .../iterative/IterationWithChainingITCase.java  | 118 +++++----
 .../iterative/IterationWithUnionITCase.java     |  81 ++----
 ...nentsWithParametrizableAggregatorITCase.java |  35 ++-
 ...entsWithParametrizableConvergenceITCase.java |  20 +-
 .../ConnectedComponentsNepheleITCase.java       |   5 +-
 .../IterationWithChainingNepheleITCase.java     |  11 +-
 .../CustomCompensatableDotProductCoGroup.java   |  11 +-
 .../CustomCompensatingMap.java                  |   1 +
 .../CustomRankCombiner.java                     |   6 +-
 .../CompensatableDotProductCoGroup.java         |   4 +-
 .../test/javaApiOperators/CoGroupITCase.java    |  94 +++----
 .../javaApiOperators/GroupReduceITCase.java     | 123 ++++------
 .../flink/test/operators/CoGroupITCase.java     |  29 +--
 .../flink/test/operators/ReduceITCase.java      |  23 +-
 .../recordJobTests/GroupOrderReduceITCase.java  |   1 -
 .../graph/WorksetConnectedComponents.java       |   2 +-
 103 files changed, 1295 insertions(+), 820 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
index ea9edff..89db1fa 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
@@ -194,15 +194,14 @@ public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
-				throws Exception {
+		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
 
 			Record r1 = null;
-			if (records1.hasNext()) {
+			while (records1.hasNext()) {
 				r1 = records1.next();
 			}
 			Record r2 = null;
-			if (records2.hasNext()) {
+			while (records2.hasNext()) {
 				r2 = records2.next();
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
index 25caf0c..88b1892 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.hadoopcompatibility.mapred.record.example;
 
 import java.io.Serializable;
@@ -103,6 +102,7 @@ public class WordCount implements Program, ProgramDescription {
 		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
 			Record element = null;
 			int sum = 0;
+			
 			while (records.hasNext()) {
 				element = records.next();
 				int cnt = element.getField(1, IntValue.class).getValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
index a3cd3d5..8aaf8e5 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.hadoopcompatibility.mapred.record.example;
 
 import java.io.Serializable;
@@ -101,6 +100,7 @@ public class WordCountWithOutputFormat implements Program, ProgramDescription {
 		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
 			Record element = null;
 			int sum = 0;
+			
 			while (records.hasNext()) {
 				element = records.next();
 				int cnt = element.getField(1, IntValue.class).getValue();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 65be2f8..8f9149c 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -413,24 +413,28 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 		}
 
 		@Override
-		public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
+		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
 				Collector<Tuple2<VertexKey, VertexValue>> out)
 			throws Exception
 		{
-			if (vertex.hasNext()) {
-				Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
+			final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator();
+			
+			if (vertexIter.hasNext()) {
+				Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next();
 				
 				@SuppressWarnings("unchecked")
-				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
 				messageIter.setSource(downcastIter);
 				
 				vertexUpdateFunction.setOutput(vertexState, out);
 				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
-			} else {
-				if (messages.hasNext()) {
+			}
+			else {
+				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
 					String message = "Target vertex does not exist!.";
 					try {
-						Tuple2<VertexKey, Message> next = messages.next();
+						Tuple2<VertexKey, Message> next = messageIter.next();
 						message = "Target vertex '" + next.f0 + "' does not exist!.";
 					} catch (Throwable t) {}
 					throw new Exception(message);
@@ -481,13 +485,15 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 		}
 		
 		@Override
-		public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges,
-				Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+		public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges,
+				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
 			throws Exception
 		{
-			if (state.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = state.next();
-				messagingFunction.set((Iterator<?>) edges, out);
+			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+			
+			if (stateIter.hasNext()) {
+				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
 				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
 			}
 		}
@@ -534,13 +540,15 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 		}
 
 		@Override
-		public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
-				Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+		public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
+				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
 			throws Exception
 		{
-			if (state.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = state.next();
-				messagingFunction.set((Iterator<?>) edges, out);
+			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+			
+			if (stateIter.hasNext()) {
+				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
 				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 3a58afc..96bc799 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -161,7 +161,7 @@ public class SpargelIteration {
 
 		@Override
 		public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
-
+			
 			if (vertex.hasNext()) {
 				Record first = vertex.next();
 				first.getFieldInto(0, vertexKey);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 3624d86..2a4d6d8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.junit.Assert;
@@ -30,6 +27,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -44,8 +42,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 	
 	public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
 		@Override
-		public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
-		}
+		public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
 	}
 
 	public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index b9cc769..9f63683 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
@@ -51,7 +48,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
 			
 			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
-				public void reduce(Iterator<Double> values, Collector<Double> out) {}
+				public void reduce(Iterable<Double> values, Collector<Double> out) {}
 			}).name("reducer")
 			.print().name("sink");
 			
@@ -95,7 +92,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
 			
 			GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
-				public void reduce(Iterator<Long> values, Collector<Long> out) {}
+				public void reduce(Iterable<Long> values, Collector<Long> out) {}
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
@@ -148,7 +145,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			data
 				.groupBy(1)
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
 			.print().name("sink");
 			
@@ -197,7 +194,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
 					.groupBy(1)
 					.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
@@ -256,7 +253,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 					public String getKey(Tuple2<String, Double> value) { return value.f0; }
 				})
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
 			.print().name("sink");
 			
@@ -314,7 +311,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 					public String getKey(Tuple2<String, Double> value) { return value.f0; }
 				})
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer");
 			
 			reduced.setCombinable(true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 8fc4324..828a635 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import static org.junit.Assert.*;
@@ -24,8 +23,6 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.junit.Test;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
@@ -287,7 +284,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
 		
 		@Override
-		public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
+		public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
 	}
 	
 	@ConstantFields("0")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index 64a4791..62d5d9b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -16,17 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
@@ -214,7 +212,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			iter.getWorkset().join(invariantInput)
 				.where(1, 2)
 				.equalTo(1, 2)
-				.with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+				.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
 					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
 						return first;
 					}
@@ -224,7 +222,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			result.join(iter.getSolutionSet())
 				.where(1, 0)
 				.equalTo(0, 2)
-				.with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+				.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
 					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
 						return second;
 					}
@@ -283,7 +281,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			
 		DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
 			.reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
-				public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
+				public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
 			})
 			.name(NEXT_WORKSET_REDUCER_NAME)
 			.withConstantSet("1->1","2->2","0->0");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index 42275af..e06e3ba 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.testfunctions;
 
-import java.util.Iterator;
 
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
@@ -32,9 +30,9 @@ public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void reduce(Iterator<T> values, Collector<T> out) {
-		while (values.hasNext()) {
-			out.collect(values.next());
+	public void reduce(Iterable<T> values, Collector<T> out) {
+		for (T next : values) {
+			out.collect(next);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index 3f24e65..b1a0e2d 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.testfunctions;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
@@ -32,7 +29,7 @@ public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void reduce(Iterator<T> values, Collector<T> out) {
-		out.collect(values.next());
+	public void reduce(Iterable<T> values, Collector<T> out) {
+		out.collect(values.iterator().next());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
index 6ef1651..13cd37b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.util;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
index 3f32423..b78a850 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.util;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index cc8fe78..1a0abeb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
@@ -41,11 +40,11 @@ import org.apache.flink.util.Collector;
  * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
  * with in empty input for the side of the data set that did not contain elements with that specific key.
  * 
- * @param <V1> The data type of the first input data set.
- * @param <V2> The data type of the second input data set.
+ * @param <IN1> The data type of the first input data set.
+ * @param <IN2> The data type of the second input data set.
  * @param <O> The data type of the returned elements.
  */
-public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
+public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
 	
 	/**
 	 * This method must be implemented to provide a user implementation of a
@@ -59,5 +58,5 @@ public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */
-	void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;
+	public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index aed7fae..0588526 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 /**
  * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
@@ -45,5 +44,5 @@ public interface CombineFunction<T> extends Function, Serializable {
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */
-	T combine(Iterator<T> values) throws Exception;
+	T combine(Iterable<T> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index edf1614..bbcbd0a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
@@ -47,5 +46,5 @@ public interface FlatCombineFunction<T> extends Function, Serializable {
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */
-	void combine(Iterator<T> values, Collector<T> out) throws Exception;
+	void combine(Iterable<T> values, Collector<T> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 5fa959c..54cc5c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
@@ -53,5 +52,5 @@ public interface GroupReduceFunction<T, O> extends Function, Serializable {
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */
-	void reduce(Iterator<T> values, Collector<O> out) throws Exception;
+	void reduce(Iterable<T> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
new file mode 100644
index 0000000..a15c31c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public class TraversableOnceException extends RuntimeException {
+
+	private static final long serialVersionUID = 7636881584773577290L;
+
+	public TraversableOnceException() {
+		super("The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
index 2d794bd..3d68e99 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
@@ -35,7 +35,7 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
 import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
 
 /**
- * Triangle enumeration is a preprocessing step to find closely connected parts in graphs.
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
  * A triangle consists of three edges that connect three vertices with each other.
  * 
  * <p>
@@ -154,7 +154,9 @@ public class EnumTrianglesBasic {
 		private final Triad outTriad = new Triad();
 		
 		@Override
-		public void reduce(Iterator<Edge> edges, Collector<Triad> out) throws Exception {
+		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+			
+			final Iterator<Edge> edges = edgesIter.iterator();
 			
 			// clear vertex list
 			vertices.clear();
@@ -165,11 +167,11 @@ public class EnumTrianglesBasic {
 			vertices.add(firstEdge.getSecondVertex());
 			
 			// build and emit triads
-			while(edges.hasNext()) {
+			while (edges.hasNext()) {
 				Integer higherVertexId = edges.next().getSecondVertex();
 				
 				// combine vertex with all previously read vertices
-				for(Integer lowerVertexId : vertices) {
+				for (Integer lowerVertexId : vertices) {
 					outTriad.setSecondVertex(lowerVertexId);
 					outTriad.setThirdVertex(higherVertexId);
 					out.collect(outTriad);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
index c0ea26a..efccb59 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
@@ -38,7 +38,7 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.EdgeWithD
 import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
 
 /**
- * Triangle enumeration is a preprocessing step to find closely connected parts in graphs.
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
  * A triangle consists of three edges that connect three vertices with each other.
  * 
  * <p>
@@ -166,8 +166,9 @@ public class EnumTrianglesOpt {
 		final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
 		
 		@Override
-		public void reduce(Iterator<Edge> edges, Collector<EdgeWithDegrees> out) throws Exception {
+		public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) {
 			
+			Iterator<Edge> edges = edgesIter.iterator();
 			otherVertices.clear();
 			
 			// get first edge
@@ -176,7 +177,7 @@ public class EnumTrianglesOpt {
 			this.otherVertices.add(edge.getSecondVertex());
 			
 			// get all other edges (assumes edges are sorted by second vertex)
-			while(edges.hasNext()) {
+			while (edges.hasNext()) {
 				edge = edges.next();
 				Integer otherVertex = edge.getSecondVertex();
 				// collect unique vertices
@@ -274,7 +275,8 @@ public class EnumTrianglesOpt {
 		private final Triad outTriad = new Triad();
 		
 		@Override
-		public void reduce(Iterator<Edge> edges, Collector<Triad> out) throws Exception {
+		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+			final Iterator<Edge> edges = edgesIter.iterator();
 			
 			// clear vertex list
 			vertices.clear();
@@ -285,7 +287,7 @@ public class EnumTrianglesOpt {
 			vertices.add(firstEdge.getSecondVertex());
 			
 			// build and emit triads
-			while(edges.hasNext()) {
+			while (edges.hasNext()) {
 				Integer higherVertexId = edges.next().getSecondVertex();
 				
 				// combine vertex with all previously read vertices

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
index ba9754f..e6a9272 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
@@ -21,7 +21,6 @@ package org.apache.flink.example.java.graph;
 import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -163,12 +162,11 @@ public class PageRankBasic {
 		private final ArrayList<Long> neighbors = new ArrayList<Long>();
 		
 		@Override
-		public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
 			neighbors.clear();
 			Long id = 0L;
 			
-			while (values.hasNext()) {
-				Tuple2<Long, Long> n = values.next();
+			for (Tuple2<Long, Long> n : values) {
 				id = n.f0;
 				neighbors.add(n.f1);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
index 22054da..e9ba406 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
@@ -70,8 +68,8 @@ public class TransitiveClosureNaive implements ProgramDescription {
 				.groupBy(0, 1)
 				.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
 					@Override
-					public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
-						out.collect(values.next());
+					public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+						out.collect(values.iterator().next());
 					}
 				});
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
index 3033c0d..9ca6ea9 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.example.java.relational;
 
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
@@ -249,12 +247,12 @@ public class WebLogAnalysis {
 		 * 2: AVG_DURATION
 		 */
 		@Override
-		public void coGroup(Iterator<Tuple3<Integer, String, Integer>> ranks, Iterator<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
 			// Check if there is a entry in the visits relation
-			if (!visits.hasNext()) {
-				while (ranks.hasNext()) {
+			if (!visits.iterator().hasNext()) {
+				for (Tuple3<Integer, String, Integer> next : ranks) {
 					// Emit all rank pairs
-					out.collect(ranks.next());
+					out.collect(next);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index af0ea52..bcc523d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
@@ -253,6 +254,26 @@ public abstract class ExecutionEnvironment {
 	public CsvReader readCsvFile(String filePath) {
 		return new CsvReader(filePath, this);
 	}
+
+	// ------------------------------------ File Input Format -----------------------------------------
+	
+	public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, String filePath) {
+		if (inputFormat == null) {
+			throw new IllegalArgumentException("InputFormat must not be null.");
+		}
+		if (filePath == null) {
+			throw new IllegalArgumentException("The file path must not be null.");
+		}
+		
+		inputFormat.setFilePath(new Path(filePath));
+		try {
+			return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
+		}
+		catch (Exception e) {
+			throw new InvalidProgramException("The type returned by the input format could not be automatically determined. " +
+					"Please specify the TypeInformation of the produced type explicitly.");
+		}
+	}
 	
 	// ----------------------------------- Generic Input Format ---------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
index b363606..087808d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
@@ -28,13 +28,13 @@ public abstract class GroupReduceIterator<IN, OUT> extends RichGroupReduceFuncti
 	private static final long serialVersionUID = 1L;
 
 
-	public abstract Iterator<OUT> reduceGroup(Iterator<IN> values) throws Exception;
+	public abstract Iterator<OUT> reduceGroup(Iterable<IN> values) throws Exception;
 	
 	
 	// -------------------------------------------------------------------------------------------
 	
 	@Override
-	public final void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception {
+	public final void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception {
 		for (Iterator<OUT> iter = reduceGroup(values); iter.hasNext(); ) {
 			out.collect(iter.next());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
index e78c31e..3169622 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichFunction;
@@ -40,6 +38,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFun
 	private static final long serialVersionUID = 1L;
 	
 	@Override
-	public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception;
+	public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
index 97c15ff..a5d45aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -24,8 +24,6 @@ import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
-import java.util.Iterator;
-
 /**
  * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
@@ -39,5 +37,5 @@ public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction im
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public abstract void combine(Iterator<T> values, Collector<T> out) throws Exception;
+	public abstract void combine(Iterable<T> values, Collector<T> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
index 801f592..9198aeb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
@@ -22,7 +22,6 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
@@ -44,7 +43,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception;
+	public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
 	
 	/**
 	 * The combine methods pre-reduces elements. It may be called on subsets of the data
@@ -59,7 +58,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	 * <p>
 	 * Since the reduce function will be called on the result of this method, it is important that this
 	 * method returns the same data type as it consumes. By default, this method only calls the
-	 * {@link #reduce(Iterator, Collector)} method. If the behavior in the pre-reducing is different
+	 * {@link #reduce(Iterable, Collector)} method. If the behavior in the pre-reducing is different
 	 * from the final reduce function (for example because the reduce function changes the data type),
 	 * this method must be overwritten, or the execution will fail.
 	 * 
@@ -70,7 +69,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	 *                   to fail and may trigger recovery.
 	 */
 	@Override
-	public void combine(Iterator<IN> values, Collector<IN> out) throws Exception {
+	public void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
 		@SuppressWarnings("unchecked")
 		Collector<OUT> c = (Collector<OUT>) out;
 		reduce(values, c);
@@ -80,7 +79,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	
 	/**
 	 * This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark
-	 * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterator, Collector)}
+	 * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterable, Collector)}
 	 * method on such functions, to pre-reduce the data before transferring it over the network to
 	 * the actual group reduce operation.
 	 * <p>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 80a5fa0..1ceb8c8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -271,13 +271,14 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 		}
 		
 		@Override
-		public void reduce(Iterator<T> values, Collector<T> out) {
+		public void reduce(Iterable<T> records, Collector<T> out) {
 			final AggregationFunction<Object>[] aggFunctions = this.aggFunctions;
 			final int[] fieldPositions = this.fieldPositions;
 
 			// aggregators are initialized from before
 			
 			T current = null;
+			final Iterator<T> values = records.iterator();
 			while (values.hasNext()) {
 				current = values.next();
 				

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 744893b..a890b32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -301,7 +301,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	/**
 	 * Intermediate step of a CoGroup transformation. <br/>
 	 * To continue the CoGroup transformation, select the grouping key of the first input {@link DataSet} by calling 
-	 * {@link CoGroupOperatorSets#where()} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
+	 * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(int, int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
 	 *
 	 * @param <I1> The type of the first input DataSet of the CoGroup transformation.
 	 * @param <I2> The type of the second input DataSet of the CoGroup transformation.
@@ -328,7 +328,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * @param field0 The first index of the Tuple fields of the first co-grouped DataSets that should be used as key
 		 * @param fields The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
 		 * @return An incomplete CoGroup transformation. 
-		 *           Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+		 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup.
 		 * 
 		 * @see Tuple
 		 * @see DataSet
@@ -348,7 +348,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * @param field0 The first field of the Tuple fields of the first co-grouped DataSets that should be used as key
 		 * @param fields The  fields of the first co-grouped DataSets that should be used as keys.
 		 * @return An incomplete CoGroup transformation.
-		 *           Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+		 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup.
 		 *
 		 * @see Tuple
 		 * @see DataSet
@@ -367,7 +367,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * 
 		 * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped.
 		 * @return An incomplete CoGroup transformation. 
-		 *           Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup. 
+		 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup. 
 		 * 
 		 * @see KeySelector
 		 * @see DataSet
@@ -381,7 +381,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		/**
 		 * Intermediate step of a CoGroup transformation. <br/>
 		 * To continue the CoGroup transformation, select the grouping key of the second input {@link DataSet} by calling 
-		 * {@link CoGroupOperatorSetsPredicate#equalTo(int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
+		 * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
 		 *
 		 */
 		public final class CoGroupOperatorSetsPredicate {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 7646fa0..fd35773 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -148,8 +146,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<T> values, Collector<T> out) {
-			out.collect(values.next());
+		public void reduce(Iterable<T> values, Collector<T> out) {
+			out.collect(values.iterator().next());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index e1424ad..7b0ad4d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -37,7 +37,6 @@ import org.apache.flink.types.TypeInformation;
 
 import org.apache.flink.api.java.DataSet;
 
-
 /**
  * This operator represents the application of a "reduceGroup" function on a data set, and the
  * result data set produced by the function.
@@ -185,7 +184,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		else {
 			throw new UnsupportedOperationException("Unrecognized key type.");
 		}
-		
 	}
 	
 	
@@ -215,7 +213,4 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		
 		return reducer;
 	}
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
index 89290f0..36bae6b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
@@ -78,47 +76,11 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
 
 
 		@Override
-		public void coGroup(Iterator<Tuple2<K, I1>> records1, Iterator<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
-			iter1.set(records1);
-			iter2.set(records2);
+		public void coGroup(Iterable<Tuple2<K, I1>> records1, Iterable<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
+			iter1.set(records1.iterator());
+			iter2.set(records2.iterator());
 			this.wrappedFunction.coGroup(iter1, iter2, out);
 		}
 		
 	}
-	
-	public static class UnwrappingKeyIterator<K, I1> implements Iterator<I1> {
-
-		private Iterator<Tuple2<K, I1>> outerIterator;
-		I1 firstValue;
-		
-		public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1) {
-			this.outerIterator = records1;
-			this.firstValue = null;
-		}
-		
-		public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1, I1 firstValue ) {
-			this.outerIterator = records1;
-			this.firstValue = firstValue;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return firstValue != null || outerIterator.hasNext();
-		}
-
-		@Override
-		public I1 next() {
-			if(firstValue != null) {
-				firstValue = null;
-				return firstValue;
-			}
-			return outerIterator.next().getField(1);
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-		
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
index 73ea004..efd52d5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
@@ -68,18 +68,10 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
 			super(wrapped);
 		}
 
-		//@SuppressWarnings("unchecked")
-		//@Override
-		//public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) throws Exception {
-		//	return wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1)));
-		//}
-
 		@SuppressWarnings("unchecked")
 		@Override
 		public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
 			wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector);
 		}
-		
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 29eb5ed..7e3f0e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -65,14 +63,14 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 
 
 		@Override
-		public void reduce(Iterator<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
-			iter.set(values);
+		public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
+			iter.set(values.iterator());
 			this.wrappedFunction.reduce(iter, out);
 		}
 
 		@Override
-		public void combine(Iterator<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception {
-				iter.set(values);
+		public void combine(Iterable<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception {
+				iter.set(values.iterator());
 				coll.set(out);
 				this.wrappedFunction.combine(iter, coll);
 		}
@@ -98,8 +96,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	
 	
 		@Override
-		public void reduce(Iterator<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
-			iter.set(values);
+		public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
+			iter.set(values.iterator());
 			this.wrappedFunction.reduce(iter, out);
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 1f2c208..c09f3a8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -21,20 +21,23 @@ package org.apache.flink.api.java.operators.translation;
 import java.util.Iterator;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TraversableOnceException;
 
 /**
  * An iterator that reads 2-tuples (key value pairs) and returns only the values (second field).
  * The iterator also tracks the keys, as the pairs flow though it.
  */
-public class TupleUnwrappingIterator<T, K> implements Iterator<T>, java.io.Serializable {
+public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
 	
 	private K lastKey; 
 	private Iterator<Tuple2<K, T>> iterator;
+	private boolean iteratorAvailable;
 	
 	public void set(Iterator<Tuple2<K, T>> iterator) {
 		this.iterator = iterator;
+		this.iteratorAvailable = true;
 	}
 	
 	public K getLastKey() {
@@ -57,4 +60,14 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, java.io.Seria
 	public void remove() {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public Iterator<T> iterator() {
+		if (iteratorAvailable) {
+			iteratorAvailable = false;
+			return this;
+		} else {
+			throw new TraversableOnceException();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 267d879..748e3f3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -42,7 +42,7 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 	
 	private static final long serialVersionUID = 1L;
 
-	protected final T wrappedFunction;
+	protected T wrappedFunction;
 
 	protected WrappingFunction(T wrappedFunction) {
 		this.wrappedFunction = wrappedFunction;
@@ -174,6 +174,5 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 		public <T extends Value> T getPreviousIterationAggregate(String name) {
 			return ((IterationRuntimeContext) context).<T>getPreviousIterationAggregate(name);
 		}
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index e9b5c25..fc7ef49 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
 /**
  * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
  */
-public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
+public abstract class CoGroupFunction extends AbstractRichFunction {
 
 	private static final long serialVersionUID = 1L;
 
@@ -45,7 +45,5 @@ public abstract class CoGroupFunction extends AbstractRichFunction implements or
 	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
 	 *                   decide whether to retry the task execution.
 	 */
-	@Override
 	public abstract void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception;
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index a1e6369..9848499 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.functions;
 
 import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
@@ -31,7 +28,7 @@ import org.apache.flink.util.Collector;
  * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
  * {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
  */
-public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
+public abstract class ReduceFunction extends AbstractRichFunction {
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -47,7 +44,6 @@ public abstract class ReduceFunction extends AbstractRichFunction implements Gro
 	 *                   runtime catches an exception, it aborts the reduce task and lets the fail-over logic
 	 *                   decide whether to retry the reduce execution.
 	 */
-	@Override
 	public abstract void reduce(Iterator<Record> records, Collector<Record> out) throws Exception;
 
 	/**
@@ -71,7 +67,6 @@ public abstract class ReduceFunction extends AbstractRichFunction implements Gro
 	 *                   runtime catches an exception, it aborts the combine task and lets the fail-over logic
 	 *                   decide whether to retry the combiner execution.
 	 */
-	@Override
 	public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
 		// to be implemented, if the reducer should use a combiner. Note that the combining method
 		// is only used, if the stub class is further annotated with the annotation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
index a8cedeb..c958b84 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.operators;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -29,13 +31,15 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.RecordOperator;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.record.functions.CoGroupFunction;
 import org.apache.flink.api.java.record.functions.FunctionAnnotation;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * CoGroupOperator that applies a {@link CoGroupFunction} to groups of records sharing
@@ -43,7 +47,7 @@ import org.apache.flink.types.Record;
  * 
  * @see CoGroupFunction
  */
-public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, CoGroupFunction> implements RecordOperator {
+public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> implements RecordOperator {
 	
 	/**
 	 * The types of the keys that the operator groups on.
@@ -61,7 +65,8 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 	 * @param keyColumn2 The position of the key in the second input's records.
 	 */
 	public static Builder builder(CoGroupFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-		return new Builder(new UserCodeObjectWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2);
+		WrappingCoGroupFunction wrapper = new WrappingCoGroupFunction(udf);
+		return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
 	}
 	
 	/**
@@ -75,7 +80,8 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 	public static Builder builder(Class<? extends CoGroupFunction> udf, Class<? extends Key<?>> keyClass,
 			int keyColumn1, int keyColumn2)
 	{
-		return new Builder(new UserCodeClassWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2);
+		WrappingCoGroupFunction wrapper = new WrappingClassCoGroupFunction(udf);
+		return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
 	}
 	
 	/**
@@ -96,7 +102,9 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 		setBroadcastVariables(builder.broadcastInputs);
 		setGroupOrderForInputOne(builder.secondaryOrder1);
 		setGroupOrderForInputTwo(builder.secondaryOrder2);
-		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
+		
+		CoGroupFunction function = ((WrappingCoGroupFunction) builder.udf.getUserCodeObject()).getWrappedFunction();
+		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new UserCodeObjectWrapper<CoGroupFunction>(function)));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -115,7 +123,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 	public static class Builder {
 		
 		/* The required parameters */
-		private final UserCodeWrapper<CoGroupFunction> udf;
+		private final UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf;
 		private final List<Class<? extends Key<?>>> keyClasses;
 		private final List<Integer> keyColumns1;
 		private final List<Integer> keyColumns2;
@@ -136,7 +144,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 		 * @param keyColumn1 The position of the key in the first input's records.
 		 * @param keyColumn2 The position of the key in the second input's records.
 		 */
-		protected Builder(UserCodeWrapper<CoGroupFunction> udf, Class<? extends Key<?>> keyClass,
+		protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf, Class<? extends Key<?>> keyClass,
 				int keyColumn1, int keyColumn2)
 		{
 			this.udf = udf;
@@ -157,7 +165,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 		 * 
 		 * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
 		 */
-		protected Builder(UserCodeWrapper<CoGroupFunction> udf) {
+		protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf) {
 			this.udf = udf;
 			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
 			this.keyColumns1 = new ArrayList<Integer>();
@@ -338,4 +346,39 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 			return new CoGroupOperator(this);
 		}
 	}
+	
+	// ============================================================================================
+	
+	public static class WrappingCoGroupFunction extends WrappingFunction<CoGroupFunction> 
+			implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public WrappingCoGroupFunction(CoGroupFunction coGrouper) {
+			super(coGrouper);
+		}
+		
+		@Override
+		public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) throws Exception {
+			this.wrappedFunction.coGroup(records1.iterator(), records2.iterator(), out);
+		}
+	}
+	
+	public static final class WrappingClassCoGroupFunction extends WrappingCoGroupFunction {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public WrappingClassCoGroupFunction(Class<? extends CoGroupFunction> reducer) {
+			super(InstantiationUtil.instantiate(reducer));
+		}
+		
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			out.writeObject(wrappedFunction.getClass());
+		}
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			Class<?> clazz = (Class<?>) in.readObject();
+			this.wrappedFunction = (CoGroupFunction) InstantiationUtil.instantiate(clazz);
+		}
+	}
 }