You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 18:32:21 UTC

[1/2] git commit: [FLINK-1005] Add non-object reusing variants of key-grouped iterator.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 76d4a75e8 -> e7c4c8586


[FLINK-1005] Add non-object reusing variants of key-grouped iterator.

Clean minor javadoc errors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1d00cff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1d00cff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1d00cff8

Branch: refs/heads/master
Commit: 1d00cff8b913e2256c7ec2ce204b01c82d7214d1
Parents: 76d4a75
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 8 11:32:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 17:01:37 2014 +0200

----------------------------------------------------------------------
 .../runtime/operators/GroupReduceDriver.java    |   2 +
 .../util/KeyGroupedIteratorImmutable.java       | 202 +++++++++++
 .../util/KeyGroupedIteratorImmutableTest.java   | 353 +++++++++++++++++++
 3 files changed, 557 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1d00cff8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index feeceb0..582f280 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -51,6 +51,8 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 
 	private TypeComparator<IT> comparator;
 	
+	private boolean mutableObjectMode = false;
+	
 	private volatile boolean running;
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1d00cff8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
new file mode 100644
index 0000000..7d093d5
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
@@ -0,0 +1,202 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.api.common.typeutils.TypeComparator;
+import eu.stratosphere.api.common.typeutils.TypeSerializer;
+import eu.stratosphere.util.MutableObjectIterator;
+
+/**
+ * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
+ * 
+ */
+public final class KeyGroupedIteratorImmutable<E> {
+	
+	private final MutableObjectIterator<E> iterator;
+
+	private final TypeSerializer<E> serializer;
+	
+	private final TypeComparator<E> comparator;
+	
+	private ValuesIterator valuesIterator;
+	
+	private E lastKeyRecord;
+	
+	private E lookahead;
+	
+	private boolean done;
+
+	/**
+	 * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
+	 * sorted by the key fields.
+	 * 
+	 * @param iterator An iterator over records, which are sorted by the key fields, in any order.
+	 * @param serializer The serializer for the data type iterated over.
+	 * @param comparator The comparator for the data type iterated over.
+	 */
+	public KeyGroupedIteratorImmutable(MutableObjectIterator<E> iterator,
+			TypeSerializer<E> serializer, TypeComparator<E> comparator)
+	{
+		if (iterator == null || serializer == null || comparator == null) {
+			throw new NullPointerException();
+		}
+		
+		this.iterator = iterator;
+		this.serializer = serializer;
+		this.comparator = comparator;
+	}
+
+	/**
+	 * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
+	 * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups.
+	 * 
+	 * @return true, if the input iterator has an other group of records with the same key.
+	 */
+	public boolean nextKey() throws IOException {
+		
+		if (lookahead != null) {
+			// common case: whole value-iterator was consumed and a new key group is available.
+			this.comparator.setReference(this.lookahead);
+			this.valuesIterator.next = this.lookahead;
+			this.lastKeyRecord = this.lookahead;
+			this.lookahead = null;
+			return true;
+		}
+		
+		// first element, empty/done, or the values iterator was not entirely consumed
+		if (this.done) {
+			return false;
+		}
+			
+		if (this.valuesIterator != null) {
+			// values was not entirely consumed. move to the next key
+			// Required if user code / reduce() method did not read the whole value iterator.
+			E next = this.serializer.createInstance();
+			while (true) {
+				if ((next = this.iterator.next(next)) != null) {
+					if (!this.comparator.equalToReference(next)) {
+						// the keys do not match, so we have a new group. store the current key
+						this.comparator.setReference(next);
+						this.valuesIterator.next = next;
+						this.lastKeyRecord = next;
+						return true;
+					}
+				}
+				else {
+					// input exhausted
+					this.valuesIterator.next = null;
+					this.valuesIterator = null;
+					this.lastKeyRecord = null;
+					this.done = true;
+					return false;
+				}
+			}
+		}
+		else {
+			// first element
+			// get the next element
+			E first = this.iterator.next(this.serializer.createInstance());
+			if (first != null) {
+				this.comparator.setReference(first);
+				this.valuesIterator = new ValuesIterator(first);
+				this.lastKeyRecord = first;
+				return true;
+			}
+			else {
+				// empty input, set everything null
+				this.done = true;
+				return false;
+			}
+		}
+	}
+	
+	private E advanceToNext() {
+		try {
+			E next = this.iterator.next(serializer.createInstance());
+			if (next != null) {
+				if (comparator.equalToReference(next)) {
+					// same key
+					return next;
+				} else {
+					// moved to the next key, no more values here
+					this.lookahead = next;
+					return null;
+				}
+			}
+			else {
+				// backing iterator is consumed
+				this.done = true;
+				return null;
+			}
+		}
+		catch (IOException e) {
+			throw new RuntimeException("An error occurred while reading the next record.", e);
+		}
+	}
+	
+	public E getCurrent() {
+		return lastKeyRecord;
+	}
+	
+	public TypeComparator<E> getComparatorWithCurrentReference() {
+		return this.comparator;
+	}
+
+	/**
+	 * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
+	 * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
+	 * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
+	 * 
+	 * @return Iterator over all values that belong to the current key.
+	 */
+	public ValuesIterator getValues() {
+		return this.valuesIterator;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public final class ValuesIterator implements Iterator<E> {
+		
+		private E next;
+		
+		private ValuesIterator(E first) {
+			this.next = first;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return next != null;
+		}
+
+		@Override
+		public E next() {
+			if (this.next != null) {
+				E current = this.next;
+				this.next = KeyGroupedIteratorImmutable.this.advanceToNext();
+				return current;
+			} else {
+				throw new NoSuchElementException();
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1d00cff8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
new file mode 100644
index 0000000..4b83422
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
@@ -0,0 +1,353 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
+import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
+import eu.stratosphere.types.IntValue;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.types.StringValue;
+import eu.stratosphere.util.MutableObjectIterator;
+
+/**
+ * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
+ * over the records with the same key.
+ */
+public class KeyGroupedIteratorImmutableTest {
+	
+	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
+	
+	private KeyGroupedIteratorImmutable<Record> psi;					// the grouping iterator, progressing in key steps
+	
+	@Before
+	public void setup()
+	{
+		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
+		
+		// add elements to the source
+		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
+		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
+		
+		
+		this.sourceIter = new MutableObjectIterator<Record>() {
+			final Iterator<IntStringPair> it = source.iterator();
+			
+			@Override
+			public Record next(Record reuse) throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					reuse.setField(0, pair.getInteger());
+					reuse.setField(1, pair.getString());
+					return reuse;
+				}
+				else {
+					return null;
+				}
+			}
+		};
+		
+		final RecordSerializer serializer = RecordSerializer.get();
+		@SuppressWarnings("unchecked")
+		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
+		
+		this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
+	}
+
+	@Test
+	public void testNextKeyOnly() throws Exception
+	{
+		try {
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testFullIterationThroughAllValues() throws IOException
+	{
+		try {
+			// Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testMixedProgress() throws Exception
+	{
+		try {
+			// Progression only via nextKey() and hasNext() - Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Progression only through nextKey() - Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			
+			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// end
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
+	{
+		try {
+			Iterator<Record> valsIter = null;
+			Record rec = null;
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	private static final class IntStringPair
+	{
+		private final IntValue integer;
+		private final StringValue string;
+
+		IntStringPair(IntValue integer, StringValue string) {
+			this.integer = integer;
+			this.string = string;
+		}
+
+		public IntValue getInteger() {
+			return integer;
+		}
+
+		public StringValue getString() {
+			return string;
+		}
+	}
+}


[2/2] git commit: [FLINK-1005] Make GroupReduce configurable to use either mutable or immutable object mode

Posted by se...@apache.org.
[FLINK-1005] Make GroupReduce configurable to use either mutable or immutable object mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e7c4c858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e7c4c858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e7c4c858

Branch: refs/heads/master
Commit: e7c4c8586e80a768b4a89990bf29b9cb7dd11888
Parents: 1d00cff
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 8 12:19:43 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 17:01:38 2014 +0200

----------------------------------------------------------------------
 .../runtime/operators/GroupReduceDriver.java    |  29 +-
 .../runtime/operators/util/TaskConfig.java      |  10 +
 .../flink/runtime/util/KeyGroupedIterator.java  |   1 -
 .../util/KeyGroupedIteratorImmutable.java       | 222 ++++++++++++
 .../operators/drivers/DriverTestData.java       |   2 +-
 .../drivers/GroupReduceDriverTest.java          | 109 +++++-
 .../operators/drivers/TestTaskContext.java      |   4 +
 .../util/KeyGroupedIteratorImmutableTest.java   | 357 +++++++++++++++++++
 .../util/KeyGroupedIteratorImmutable.java       | 202 -----------
 .../util/KeyGroupedIteratorImmutableTest.java   | 353 ------------------
 10 files changed, 722 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 582f280..2ec9873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.KeyGroupedIteratorImmutable;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -91,6 +92,12 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 		this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
 		this.comparator = this.taskContext.getDriverComparator(0);
 		this.input = this.taskContext.getInput(0);
+		
+		this.mutableObjectMode = config.getMutableObjectMode();
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("GroupReduceDriver uses " + (this.mutableObjectMode ? "MUTABLE" : "IMMUTABLE") + " object mode.");
+		}
 	}
 
 	@Override
@@ -99,15 +106,23 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 			LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code."));
 		}
 
-		final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
-
 		// cache references on the stack
 		final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
 		final Collector<OT> output = this.taskContext.getOutputCollector();
-
-		// run stub implementation
-		while (this.running && iter.nextKey()) {
-			stub.reduce(iter.getValues(), output);
+		
+		if (mutableObjectMode) {
+			final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
+			// run stub implementation
+			while (this.running && iter.nextKey()) {
+				stub.reduce(iter.getValues(), output);
+			}
+		}
+		else {
+			final KeyGroupedIteratorImmutable<IT> iter = new KeyGroupedIteratorImmutable<IT>(this.input, this.serializer, this.comparator);
+			// run stub implementation
+			while (this.running && iter.nextKey()) {
+				stub.reduce(iter.getValues(), output);
+			}
 		}
 	}
 
@@ -118,4 +133,4 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 	public void cancel() {
 		this.running = false;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index f583235..5f4afe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -76,6 +76,8 @@ public class TaskConfig {
 	private static final String DRIVER_COMPARATOR_PARAMETERS_PREFIX = "driver.comp.params.";
 	
 	private static final String DRIVER_PAIR_COMPARATOR_FACTORY = "driver.paircomp";
+	
+	private static final String DRIVER_MUTABLE_OBJECT_MODE = "diver.mutableobjects";
 
 	// -------------------------------------- Inputs ----------------------------------------------
 
@@ -335,6 +337,14 @@ public class TaskConfig {
 		}
 	}
 	
+	public void setMutableObjectMode(boolean mode) {
+		this.config.setBoolean(DRIVER_MUTABLE_OBJECT_MODE, mode);
+	}
+	
+	public boolean getMutableObjectMode() {
+		return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, true);
+	}
+	
 	public void setDriverComparator(TypeComparatorFactory<?> factory, int inputNum) {
 		setTypeComparatorFactory(factory, DRIVER_COMPARATOR_FACTORY_PREFIX + inputNum,
 			DRIVER_COMPARATOR_PARAMETERS_PREFIX + inputNum + SEPARATOR);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
index 88048f0..e0d1d99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java
new file mode 100644
index 0000000..80ac231
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+/**
+ * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
+ * 
+ */
+public final class KeyGroupedIteratorImmutable<E> {
+	
+	private final MutableObjectIterator<E> iterator;
+
+	private final TypeSerializer<E> serializer;
+	
+	private final TypeComparator<E> comparator;
+	
+	private ValuesIterator valuesIterator;
+	
+	private E lastKeyRecord;
+	
+	private E lookahead;
+	
+	private boolean done;
+
+	/**
+	 * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
+	 * sorted by the key fields.
+	 * 
+	 * @param iterator An iterator over records, which are sorted by the key fields, in any order.
+	 * @param serializer The serializer for the data type iterated over.
+	 * @param comparator The comparator for the data type iterated over.
+	 */
+	public KeyGroupedIteratorImmutable(MutableObjectIterator<E> iterator,
+			TypeSerializer<E> serializer, TypeComparator<E> comparator)
+	{
+		if (iterator == null || serializer == null || comparator == null) {
+			throw new NullPointerException();
+		}
+		
+		this.iterator = iterator;
+		this.serializer = serializer;
+		this.comparator = comparator;
+	}
+
+	/**
+	 * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
+	 * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups.
+	 * 
+	 * @return true, if the input iterator has an other group of records with the same key.
+	 */
+	public boolean nextKey() throws IOException {
+		
+		if (lookahead != null) {
+			// common case: whole value-iterator was consumed and a new key group is available.
+			this.comparator.setReference(this.lookahead);
+			this.valuesIterator.next = this.lookahead;
+			this.lastKeyRecord = this.lookahead;
+			this.lookahead = null;
+			this.valuesIterator.iteratorAvailable = true;
+			return true;
+		}
+		
+		// first element, empty/done, or the values iterator was not entirely consumed
+		if (this.done) {
+			return false;
+		}
+			
+		if (this.valuesIterator != null) {
+			// values was not entirely consumed. move to the next key
+			// Required if user code / reduce() method did not read the whole value iterator.
+			E next = this.serializer.createInstance();
+			while (true) {
+				if ((next = this.iterator.next(next)) != null) {
+					if (!this.comparator.equalToReference(next)) {
+						// the keys do not match, so we have a new group. store the current key
+						this.comparator.setReference(next);
+						this.valuesIterator.next = next;
+						this.lastKeyRecord = next;
+						this.valuesIterator.iteratorAvailable = true;
+						return true;
+					}
+				}
+				else {
+					// input exhausted
+					this.valuesIterator.next = null;
+					this.valuesIterator = null;
+					this.lastKeyRecord = null;
+					this.done = true;
+					return false;
+				}
+			}
+		}
+		else {
+			// first element
+			// get the next element
+			E first = this.iterator.next(this.serializer.createInstance());
+			if (first != null) {
+				this.comparator.setReference(first);
+				this.valuesIterator = new ValuesIterator(first);
+				this.lastKeyRecord = first;
+				return true;
+			}
+			else {
+				// empty input, set everything null
+				this.done = true;
+				return false;
+			}
+		}
+	}
+	
+	private E advanceToNext() {
+		try {
+			E next = this.iterator.next(serializer.createInstance());
+			if (next != null) {
+				if (comparator.equalToReference(next)) {
+					// same key
+					return next;
+				} else {
+					// moved to the next key, no more values here
+					this.lookahead = next;
+					return null;
+				}
+			}
+			else {
+				// backing iterator is consumed
+				this.done = true;
+				return null;
+			}
+		}
+		catch (IOException e) {
+			throw new RuntimeException("An error occurred while reading the next record.", e);
+		}
+	}
+	
+	public E getCurrent() {
+		return lastKeyRecord;
+	}
+	
+	public TypeComparator<E> getComparatorWithCurrentReference() {
+		return this.comparator;
+	}
+
+	/**
+	 * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
+	 * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
+	 * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
+	 * 
+	 * @return Iterator over all values that belong to the current key.
+	 */
+	public ValuesIterator getValues() {
+		return this.valuesIterator;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public final class ValuesIterator implements Iterator<E>, Iterable<E> {
+		
+		private E next;
+		
+		private boolean iteratorAvailable = true;
+		
+		private ValuesIterator(E first) {
+			this.next = first;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return next != null;
+		}
+
+		@Override
+		public E next() {
+			if (this.next != null) {
+				E current = this.next;
+				this.next = KeyGroupedIteratorImmutable.this.advanceToNext();
+				return current;
+			} else {
+				throw new NoSuchElementException();
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+		
+		@Override
+		public Iterator<E> iterator() {
+			if (iteratorAvailable) {
+				iteratorAvailable = false;
+				return this;
+			}
+			else {
+				throw new TraversableOnceException();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
index e5c655b..cc7e9ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
@@ -100,7 +100,7 @@ public class DriverTestData {
 	
 	public static final void compareTupleArrays(Object[] expected, Object[] found) {
 		if (expected.length != found.length) {
-			throw new IllegalArgumentException();
+			Assert.assertEquals("Length of result is wrong", expected.length, found.length);
 		}
 		
 		for (int i = 0; i < expected.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index f872b56..ecc5259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.drivers;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -28,7 +29,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
 import org.apache.flink.runtime.util.RegularToMutableObjectIterator;
 import org.apache.flink.types.IntValue;
@@ -53,14 +53,18 @@ public class GroupReduceDriverTest {
 			TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
 			context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 			
+			GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+			
 			context.setInput1(input, typeInfo.createSerializer());
 			context.setComparator1(comparator);
-			context.setCollector(new DiscardingOutputCollector<Tuple2<String, Integer>>());
+			context.setCollector(result);
 			
 			GroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>> driver = new GroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>>();
 			driver.setup(context);
 			driver.prepare();
 			driver.run();
+			
+			Assert.assertTrue(result.getList().isEmpty());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -141,7 +145,84 @@ public class GroupReduceDriverTest {
 		}
 	}
 	
-
+	@Test
+	public void testAllReduceDriverIncorrectlyAccumulatingMutable() {
+		try {
+			TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+			
+			List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
+			TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
+			MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer());
+			TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
+			
+			GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer());
+			
+			context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
+			context.setInput1(input, typeInfo.createSerializer());
+			context.setComparator1(comparator);
+			context.setCollector(result);
+			context.setUdf(new ConcatSumMutableAccumulatingReducer());
+			
+			GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> driver = new GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>();
+			driver.setup(context);
+			driver.prepare();
+			driver.run();
+			
+			Object[] res = result.getList().toArray();
+			Object[] expected = DriverTestData.createReduceMutableDataGroupedResult().toArray();
+			
+			try {
+				DriverTestData.compareTupleArrays(expected, res);
+				Assert.fail("Accumulationg mutable objects is expected to result in incorrect values.");
+			}
+			catch (AssertionError e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testAllReduceDriverAccumulatingImmutable() {
+		try {
+			TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+			
+			List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
+			TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
+			MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer());
+			TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
+			
+			GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer());
+			
+			context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
+			context.setInput1(input, typeInfo.createSerializer());
+			context.setComparator1(comparator);
+			context.setCollector(result);
+			context.setUdf(new ConcatSumMutableAccumulatingReducer());
+			context.setMutableObjectMode(false);
+			
+			GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> driver = new GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>();
+			driver.setup(context);
+			driver.prepare();
+			driver.run();
+			
+			Object[] res = result.getList().toArray();
+			Object[] expected = DriverTestData.createReduceMutableDataGroupedResult().toArray();
+			
+			DriverTestData.compareTupleArrays(expected, res);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Test UDFs
@@ -178,4 +259,26 @@ public class GroupReduceDriverTest {
 			out.collect(current);
 		}
 	}
+	
+	public static final class ConcatSumMutableAccumulatingReducer implements GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
+
+		@Override
+		public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) throws Exception {
+			List<Tuple2<StringValue, IntValue>> all = new ArrayList<Tuple2<StringValue,IntValue>>();
+			
+			for (Tuple2<StringValue, IntValue> t : values) {
+				all.add(t);
+			}
+			
+			Tuple2<StringValue, IntValue> result = all.get(0);
+			
+			for (int i = 1; i < all.size(); i++) {
+				Tuple2<StringValue, IntValue> e = all.get(i);
+				result.f0.append(e.f0);
+				result.f1.setValue(result.f1.getValue() + e.f1.getValue());
+			}
+			
+			out.collect(result);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index f0f51a6..e5ece3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -119,6 +119,10 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 		this.config.setDriverStrategy(strategy);
 	}
 	
+	public void setMutableObjectMode(boolean mutableObjectMode) {
+		this.config.setMutableObjectMode(mutableObjectMode);
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Context Methods
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
new file mode 100644
index 0000000..54a1492
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
+ * over the records with the same key.
+ */
+public class KeyGroupedIteratorImmutableTest {
+	
+	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
+	
+	private KeyGroupedIteratorImmutable<Record> psi;					// the grouping iterator, progressing in key steps
+	
+	@Before
+	public void setup()
+	{
+		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
+		
+		// add elements to the source
+		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
+		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
+		
+		
+		this.sourceIter = new MutableObjectIterator<Record>() {
+			final Iterator<IntStringPair> it = source.iterator();
+			
+			@Override
+			public Record next(Record reuse) throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					reuse.setField(0, pair.getInteger());
+					reuse.setField(1, pair.getString());
+					return reuse;
+				}
+				else {
+					return null;
+				}
+			}
+		};
+		
+		final RecordSerializer serializer = RecordSerializer.get();
+		@SuppressWarnings("unchecked")
+		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
+		
+		this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
+	}
+
+	@Test
+	public void testNextKeyOnly() throws Exception
+	{
+		try {
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testFullIterationThroughAllValues() throws IOException
+	{
+		try {
+			// Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testMixedProgress() throws Exception
+	{
+		try {
+			// Progression only via nextKey() and hasNext() - Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Progression only through nextKey() - Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			
+			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// end
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
+	{
+		try {
+			Iterator<Record> valsIter = null;
+			Record rec = null;
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	private static final class IntStringPair
+	{
+		private final IntValue integer;
+		private final StringValue string;
+
+		IntStringPair(IntValue integer, StringValue string) {
+			this.integer = integer;
+			this.string = string;
+		}
+
+		public IntValue getInteger() {
+			return integer;
+		}
+
+		public StringValue getString() {
+			return string;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
deleted file mode 100644
index 7d093d5..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.runtime.util;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import eu.stratosphere.api.common.typeutils.TypeComparator;
-import eu.stratosphere.api.common.typeutils.TypeSerializer;
-import eu.stratosphere.util.MutableObjectIterator;
-
-/**
- * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
- * 
- */
-public final class KeyGroupedIteratorImmutable<E> {
-	
-	private final MutableObjectIterator<E> iterator;
-
-	private final TypeSerializer<E> serializer;
-	
-	private final TypeComparator<E> comparator;
-	
-	private ValuesIterator valuesIterator;
-	
-	private E lastKeyRecord;
-	
-	private E lookahead;
-	
-	private boolean done;
-
-	/**
-	 * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
-	 * sorted by the key fields.
-	 * 
-	 * @param iterator An iterator over records, which are sorted by the key fields, in any order.
-	 * @param serializer The serializer for the data type iterated over.
-	 * @param comparator The comparator for the data type iterated over.
-	 */
-	public KeyGroupedIteratorImmutable(MutableObjectIterator<E> iterator,
-			TypeSerializer<E> serializer, TypeComparator<E> comparator)
-	{
-		if (iterator == null || serializer == null || comparator == null) {
-			throw new NullPointerException();
-		}
-		
-		this.iterator = iterator;
-		this.serializer = serializer;
-		this.comparator = comparator;
-	}
-
-	/**
-	 * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
-	 * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups.
-	 * 
-	 * @return true, if the input iterator has an other group of records with the same key.
-	 */
-	public boolean nextKey() throws IOException {
-		
-		if (lookahead != null) {
-			// common case: whole value-iterator was consumed and a new key group is available.
-			this.comparator.setReference(this.lookahead);
-			this.valuesIterator.next = this.lookahead;
-			this.lastKeyRecord = this.lookahead;
-			this.lookahead = null;
-			return true;
-		}
-		
-		// first element, empty/done, or the values iterator was not entirely consumed
-		if (this.done) {
-			return false;
-		}
-			
-		if (this.valuesIterator != null) {
-			// values was not entirely consumed. move to the next key
-			// Required if user code / reduce() method did not read the whole value iterator.
-			E next = this.serializer.createInstance();
-			while (true) {
-				if ((next = this.iterator.next(next)) != null) {
-					if (!this.comparator.equalToReference(next)) {
-						// the keys do not match, so we have a new group. store the current key
-						this.comparator.setReference(next);
-						this.valuesIterator.next = next;
-						this.lastKeyRecord = next;
-						return true;
-					}
-				}
-				else {
-					// input exhausted
-					this.valuesIterator.next = null;
-					this.valuesIterator = null;
-					this.lastKeyRecord = null;
-					this.done = true;
-					return false;
-				}
-			}
-		}
-		else {
-			// first element
-			// get the next element
-			E first = this.iterator.next(this.serializer.createInstance());
-			if (first != null) {
-				this.comparator.setReference(first);
-				this.valuesIterator = new ValuesIterator(first);
-				this.lastKeyRecord = first;
-				return true;
-			}
-			else {
-				// empty input, set everything null
-				this.done = true;
-				return false;
-			}
-		}
-	}
-	
-	private E advanceToNext() {
-		try {
-			E next = this.iterator.next(serializer.createInstance());
-			if (next != null) {
-				if (comparator.equalToReference(next)) {
-					// same key
-					return next;
-				} else {
-					// moved to the next key, no more values here
-					this.lookahead = next;
-					return null;
-				}
-			}
-			else {
-				// backing iterator is consumed
-				this.done = true;
-				return null;
-			}
-		}
-		catch (IOException e) {
-			throw new RuntimeException("An error occurred while reading the next record.", e);
-		}
-	}
-	
-	public E getCurrent() {
-		return lastKeyRecord;
-	}
-	
-	public TypeComparator<E> getComparatorWithCurrentReference() {
-		return this.comparator;
-	}
-
-	/**
-	 * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
-	 * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
-	 * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
-	 * 
-	 * @return Iterator over all values that belong to the current key.
-	 */
-	public ValuesIterator getValues() {
-		return this.valuesIterator;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public final class ValuesIterator implements Iterator<E> {
-		
-		private E next;
-		
-		private ValuesIterator(E first) {
-			this.next = first;
-		}
-
-		@Override
-		public boolean hasNext() {
-			return next != null;
-		}
-
-		@Override
-		public E next() {
-			if (this.next != null) {
-				E current = this.next;
-				this.next = KeyGroupedIteratorImmutable.this.advanceToNext();
-				return current;
-			} else {
-				throw new NoSuchElementException();
-			}
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
deleted file mode 100644
index 4b83422..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.runtime.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
-import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.util.MutableObjectIterator;
-
-/**
- * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
- * over the records with the same key.
- */
-public class KeyGroupedIteratorImmutableTest {
-	
-	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
-	
-	private KeyGroupedIteratorImmutable<Record> psi;					// the grouping iterator, progressing in key steps
-	
-	@Before
-	public void setup()
-	{
-		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
-		
-		// add elements to the source
-		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
-		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
-		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
-		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
-		
-		
-		this.sourceIter = new MutableObjectIterator<Record>() {
-			final Iterator<IntStringPair> it = source.iterator();
-			
-			@Override
-			public Record next(Record reuse) throws IOException {
-				if (it.hasNext()) {
-					IntStringPair pair = it.next();
-					reuse.setField(0, pair.getInteger());
-					reuse.setField(1, pair.getString());
-					return reuse;
-				}
-				else {
-					return null;
-				}
-			}
-		};
-		
-		final RecordSerializer serializer = RecordSerializer.get();
-		@SuppressWarnings("unchecked")
-		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
-		
-		this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
-	}
-
-	@Test
-	public void testNextKeyOnly() throws Exception
-	{
-		try {
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testFullIterationThroughAllValues() throws IOException
-	{
-		try {
-			// Key 1, Value A
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 2, Value B
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 3, Values C, D
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 4, Values E, F, G
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 5, Values H, I, J, K, L
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testMixedProgress() throws Exception
-	{
-		try {
-			// Progression only via nextKey() and hasNext() - Key 1, Value A
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// Progression only through nextKey() - Key 2, Value B
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			
-			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// Key 5, Values H, I, J, K, L
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// end
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
-	{
-		try {
-			Iterator<Record> valsIter = null;
-			Record rec = null;
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
-			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	private static final class IntStringPair
-	{
-		private final IntValue integer;
-		private final StringValue string;
-
-		IntStringPair(IntValue integer, StringValue string) {
-			this.integer = integer;
-			this.string = string;
-		}
-
-		public IntValue getInteger() {
-			return integer;
-		}
-
-		public StringValue getString() {
-			return string;
-		}
-	}
-}