You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 18:32:21 UTC
[1/2] git commit: [FLINK-1005] Add non-object reusing variants of
key-grouped iterator.
Repository: incubator-flink
Updated Branches:
refs/heads/master 76d4a75e8 -> e7c4c8586
[FLINK-1005] Add non-object reusing variants of key-grouped iterator.
Clean minor javadoc errors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1d00cff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1d00cff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1d00cff8
Branch: refs/heads/master
Commit: 1d00cff8b913e2256c7ec2ce204b01c82d7214d1
Parents: 76d4a75
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 8 11:32:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 17:01:37 2014 +0200
----------------------------------------------------------------------
.../runtime/operators/GroupReduceDriver.java | 2 +
.../util/KeyGroupedIteratorImmutable.java | 202 +++++++++++
.../util/KeyGroupedIteratorImmutableTest.java | 353 +++++++++++++++++++
3 files changed, 557 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1d00cff8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index feeceb0..582f280 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -51,6 +51,8 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
private TypeComparator<IT> comparator;
+ private boolean mutableObjectMode = false;
+
private volatile boolean running;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1d00cff8/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
new file mode 100644
index 0000000..7d093d5
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
@@ -0,0 +1,202 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.api.common.typeutils.TypeComparator;
+import eu.stratosphere.api.common.typeutils.TypeSerializer;
+import eu.stratosphere.util.MutableObjectIterator;
+
+/**
+ * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
+ *
+ */
+public final class KeyGroupedIteratorImmutable<E> {
+
+ private final MutableObjectIterator<E> iterator;
+
+ private final TypeSerializer<E> serializer;
+
+ private final TypeComparator<E> comparator;
+
+ private ValuesIterator valuesIterator;
+
+ private E lastKeyRecord;
+
+ private E lookahead;
+
+ private boolean done;
+
+ /**
+ * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
+ * sorted by the key fields.
+ *
+ * @param iterator An iterator over records, which are sorted by the key fields, in any order.
+ * @param serializer The serializer for the data type iterated over.
+ * @param comparator The comparator for the data type iterated over.
+ */
+ public KeyGroupedIteratorImmutable(MutableObjectIterator<E> iterator,
+ TypeSerializer<E> serializer, TypeComparator<E> comparator)
+ {
+ if (iterator == null || serializer == null || comparator == null) {
+ throw new NullPointerException();
+ }
+
+ this.iterator = iterator;
+ this.serializer = serializer;
+ this.comparator = comparator;
+ }
+
+ /**
+ * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
+ * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups.
+ *
+ * @return true, if the input iterator has an other group of records with the same key.
+ */
+ public boolean nextKey() throws IOException {
+
+ if (lookahead != null) {
+ // common case: whole value-iterator was consumed and a new key group is available.
+ this.comparator.setReference(this.lookahead);
+ this.valuesIterator.next = this.lookahead;
+ this.lastKeyRecord = this.lookahead;
+ this.lookahead = null;
+ return true;
+ }
+
+ // first element, empty/done, or the values iterator was not entirely consumed
+ if (this.done) {
+ return false;
+ }
+
+ if (this.valuesIterator != null) {
+ // values was not entirely consumed. move to the next key
+ // Required if user code / reduce() method did not read the whole value iterator.
+ E next = this.serializer.createInstance();
+ while (true) {
+ if ((next = this.iterator.next(next)) != null) {
+ if (!this.comparator.equalToReference(next)) {
+ // the keys do not match, so we have a new group. store the current key
+ this.comparator.setReference(next);
+ this.valuesIterator.next = next;
+ this.lastKeyRecord = next;
+ return true;
+ }
+ }
+ else {
+ // input exhausted
+ this.valuesIterator.next = null;
+ this.valuesIterator = null;
+ this.lastKeyRecord = null;
+ this.done = true;
+ return false;
+ }
+ }
+ }
+ else {
+ // first element
+ // get the next element
+ E first = this.iterator.next(this.serializer.createInstance());
+ if (first != null) {
+ this.comparator.setReference(first);
+ this.valuesIterator = new ValuesIterator(first);
+ this.lastKeyRecord = first;
+ return true;
+ }
+ else {
+ // empty input, set everything null
+ this.done = true;
+ return false;
+ }
+ }
+ }
+
+ private E advanceToNext() {
+ try {
+ E next = this.iterator.next(serializer.createInstance());
+ if (next != null) {
+ if (comparator.equalToReference(next)) {
+ // same key
+ return next;
+ } else {
+ // moved to the next key, no more values here
+ this.lookahead = next;
+ return null;
+ }
+ }
+ else {
+ // backing iterator is consumed
+ this.done = true;
+ return null;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("An error occurred while reading the next record.", e);
+ }
+ }
+
+ public E getCurrent() {
+ return lastKeyRecord;
+ }
+
+ public TypeComparator<E> getComparatorWithCurrentReference() {
+ return this.comparator;
+ }
+
+ /**
+ * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
+ * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
+ * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
+ *
+ * @return Iterator over all values that belong to the current key.
+ */
+ public ValuesIterator getValues() {
+ return this.valuesIterator;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public final class ValuesIterator implements Iterator<E> {
+
+ private E next;
+
+ private ValuesIterator(E first) {
+ this.next = first;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public E next() {
+ if (this.next != null) {
+ E current = this.next;
+ this.next = KeyGroupedIteratorImmutable.this.advanceToNext();
+ return current;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1d00cff8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
new file mode 100644
index 0000000..4b83422
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
@@ -0,0 +1,353 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
+import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
+import eu.stratosphere.types.IntValue;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.types.StringValue;
+import eu.stratosphere.util.MutableObjectIterator;
+
+/**
+ * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
+ * over the records with the same key.
+ */
+public class KeyGroupedIteratorImmutableTest {
+
+ private MutableObjectIterator<Record> sourceIter; // the iterator that provides the input
+
+ private KeyGroupedIteratorImmutable<Record> psi; // the grouping iterator, progressing in key steps
+
+ @Before
+ public void setup()
+ {
+ final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
+
+ // add elements to the source
+ source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
+ source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
+ source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
+ source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
+ source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
+ source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
+ source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
+
+
+ this.sourceIter = new MutableObjectIterator<Record>() {
+ final Iterator<IntStringPair> it = source.iterator();
+
+ @Override
+ public Record next(Record reuse) throws IOException {
+ if (it.hasNext()) {
+ IntStringPair pair = it.next();
+ reuse.setField(0, pair.getInteger());
+ reuse.setField(1, pair.getString());
+ return reuse;
+ }
+ else {
+ return null;
+ }
+ }
+ };
+
+ final RecordSerializer serializer = RecordSerializer.get();
+ @SuppressWarnings("unchecked")
+ final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
+
+ this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
+ }
+
+ @Test
+ public void testNextKeyOnly() throws Exception
+ {
+ try {
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
+
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testFullIterationThroughAllValues() throws IOException
+ {
+ try {
+ // Key 1, Value A
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 2, Value B
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 3, Values C, D
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 4, Values E, F, G
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 5, Values H, I, J, K, L
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testMixedProgress() throws Exception
+ {
+ try {
+ // Progression only via nextKey() and hasNext() - Key 1, Value A
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+
+ // Progression only through nextKey() - Key 2, Value B
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+
+ // Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Progression first via next() only, then hasNext() only Key 4, Values E, F, G
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+
+ // Key 5, Values H, I, J, K, L
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+
+ // end
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
+ {
+ try {
+ Iterator<Record> valsIter = null;
+ Record rec = null;
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ valsIter = this.psi.getValues();
+ Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ valsIter = this.psi.getValues();
+ Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ valsIter = this.psi.getValues();
+ Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ private static final class IntStringPair
+ {
+ private final IntValue integer;
+ private final StringValue string;
+
+ IntStringPair(IntValue integer, StringValue string) {
+ this.integer = integer;
+ this.string = string;
+ }
+
+ public IntValue getInteger() {
+ return integer;
+ }
+
+ public StringValue getString() {
+ return string;
+ }
+ }
+}
[2/2] git commit: [FLINK-1005] Make GroupReduce configurable to use
either mutable or immutable object mode
Posted by se...@apache.org.
[FLINK-1005] Make GroupReduce configurable to use either mutable or immutable object mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e7c4c858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e7c4c858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e7c4c858
Branch: refs/heads/master
Commit: e7c4c8586e80a768b4a89990bf29b9cb7dd11888
Parents: 1d00cff
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 8 12:19:43 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 17:01:38 2014 +0200
----------------------------------------------------------------------
.../runtime/operators/GroupReduceDriver.java | 29 +-
.../runtime/operators/util/TaskConfig.java | 10 +
.../flink/runtime/util/KeyGroupedIterator.java | 1 -
.../util/KeyGroupedIteratorImmutable.java | 222 ++++++++++++
.../operators/drivers/DriverTestData.java | 2 +-
.../drivers/GroupReduceDriverTest.java | 109 +++++-
.../operators/drivers/TestTaskContext.java | 4 +
.../util/KeyGroupedIteratorImmutableTest.java | 357 +++++++++++++++++++
.../util/KeyGroupedIteratorImmutable.java | 202 -----------
.../util/KeyGroupedIteratorImmutableTest.java | 353 ------------------
10 files changed, 722 insertions(+), 567 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 582f280..2ec9873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.KeyGroupedIteratorImmutable;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -91,6 +92,12 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
this.comparator = this.taskContext.getDriverComparator(0);
this.input = this.taskContext.getInput(0);
+
+ this.mutableObjectMode = config.getMutableObjectMode();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GroupReduceDriver uses " + (this.mutableObjectMode ? "MUTABLE" : "IMMUTABLE") + " object mode.");
+ }
}
@Override
@@ -99,15 +106,23 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code."));
}
- final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
-
// cache references on the stack
final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
-
- // run stub implementation
- while (this.running && iter.nextKey()) {
- stub.reduce(iter.getValues(), output);
+
+ if (mutableObjectMode) {
+ final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
+ // run stub implementation
+ while (this.running && iter.nextKey()) {
+ stub.reduce(iter.getValues(), output);
+ }
+ }
+ else {
+ final KeyGroupedIteratorImmutable<IT> iter = new KeyGroupedIteratorImmutable<IT>(this.input, this.serializer, this.comparator);
+ // run stub implementation
+ while (this.running && iter.nextKey()) {
+ stub.reduce(iter.getValues(), output);
+ }
}
}
@@ -118,4 +133,4 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
public void cancel() {
this.running = false;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index f583235..5f4afe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -76,6 +76,8 @@ public class TaskConfig {
private static final String DRIVER_COMPARATOR_PARAMETERS_PREFIX = "driver.comp.params.";
private static final String DRIVER_PAIR_COMPARATOR_FACTORY = "driver.paircomp";
+
+ private static final String DRIVER_MUTABLE_OBJECT_MODE = "diver.mutableobjects";
// -------------------------------------- Inputs ----------------------------------------------
@@ -335,6 +337,14 @@ public class TaskConfig {
}
}
+ public void setMutableObjectMode(boolean mode) {
+ this.config.setBoolean(DRIVER_MUTABLE_OBJECT_MODE, mode);
+ }
+
+ public boolean getMutableObjectMode() {
+ return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, true);
+ }
+
public void setDriverComparator(TypeComparatorFactory<?> factory, int inputNum) {
setTypeComparatorFactory(factory, DRIVER_COMPARATOR_FACTORY_PREFIX + inputNum,
DRIVER_COMPARATOR_PARAMETERS_PREFIX + inputNum + SEPARATOR);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
index 88048f0..e0d1d99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java
new file mode 100644
index 0000000..80ac231
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutable.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+/**
+ * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
+ *
+ */
+public final class KeyGroupedIteratorImmutable<E> {
+
+ private final MutableObjectIterator<E> iterator;
+
+ private final TypeSerializer<E> serializer;
+
+ private final TypeComparator<E> comparator;
+
+ private ValuesIterator valuesIterator;
+
+ private E lastKeyRecord;
+
+ private E lookahead;
+
+ private boolean done;
+
+ /**
+ * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
+ * sorted by the key fields.
+ *
+ * @param iterator An iterator over records, which are sorted by the key fields, in any order.
+ * @param serializer The serializer for the data type iterated over.
+ * @param comparator The comparator for the data type iterated over.
+ */
+ public KeyGroupedIteratorImmutable(MutableObjectIterator<E> iterator,
+ TypeSerializer<E> serializer, TypeComparator<E> comparator)
+ {
+ if (iterator == null || serializer == null || comparator == null) {
+ throw new NullPointerException();
+ }
+
+ this.iterator = iterator;
+ this.serializer = serializer;
+ this.comparator = comparator;
+ }
+
+ /**
+ * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
+ * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups.
+ *
+ * @return true, if the input iterator has an other group of records with the same key.
+ */
+ public boolean nextKey() throws IOException {
+
+ if (lookahead != null) {
+ // common case: whole value-iterator was consumed and a new key group is available.
+ this.comparator.setReference(this.lookahead);
+ this.valuesIterator.next = this.lookahead;
+ this.lastKeyRecord = this.lookahead;
+ this.lookahead = null;
+ this.valuesIterator.iteratorAvailable = true;
+ return true;
+ }
+
+ // first element, empty/done, or the values iterator was not entirely consumed
+ if (this.done) {
+ return false;
+ }
+
+ if (this.valuesIterator != null) {
+ // values was not entirely consumed. move to the next key
+ // Required if user code / reduce() method did not read the whole value iterator.
+ E next = this.serializer.createInstance();
+ while (true) {
+ if ((next = this.iterator.next(next)) != null) {
+ if (!this.comparator.equalToReference(next)) {
+ // the keys do not match, so we have a new group. store the current key
+ this.comparator.setReference(next);
+ this.valuesIterator.next = next;
+ this.lastKeyRecord = next;
+ this.valuesIterator.iteratorAvailable = true;
+ return true;
+ }
+ }
+ else {
+ // input exhausted
+ this.valuesIterator.next = null;
+ this.valuesIterator = null;
+ this.lastKeyRecord = null;
+ this.done = true;
+ return false;
+ }
+ }
+ }
+ else {
+ // first element
+ // get the next element
+ E first = this.iterator.next(this.serializer.createInstance());
+ if (first != null) {
+ this.comparator.setReference(first);
+ this.valuesIterator = new ValuesIterator(first);
+ this.lastKeyRecord = first;
+ return true;
+ }
+ else {
+ // empty input, set everything null
+ this.done = true;
+ return false;
+ }
+ }
+ }
+
+ private E advanceToNext() {
+ try {
+ E next = this.iterator.next(serializer.createInstance());
+ if (next != null) {
+ if (comparator.equalToReference(next)) {
+ // same key
+ return next;
+ } else {
+ // moved to the next key, no more values here
+ this.lookahead = next;
+ return null;
+ }
+ }
+ else {
+ // backing iterator is consumed
+ this.done = true;
+ return null;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("An error occurred while reading the next record.", e);
+ }
+ }
+
+ public E getCurrent() {
+ return lastKeyRecord;
+ }
+
+ public TypeComparator<E> getComparatorWithCurrentReference() {
+ return this.comparator;
+ }
+
+ /**
+ * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
+ * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
+ * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
+ *
+ * @return Iterator over all values that belong to the current key.
+ */
+ public ValuesIterator getValues() {
+ return this.valuesIterator;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public final class ValuesIterator implements Iterator<E>, Iterable<E> {
+
+ private E next;
+
+ private boolean iteratorAvailable = true;
+
+ private ValuesIterator(E first) {
+ this.next = first;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public E next() {
+ if (this.next != null) {
+ E current = this.next;
+ this.next = KeyGroupedIteratorImmutable.this.advanceToNext();
+ return current;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ if (iteratorAvailable) {
+ iteratorAvailable = false;
+ return this;
+ }
+ else {
+ throw new TraversableOnceException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
index e5c655b..cc7e9ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/DriverTestData.java
@@ -100,7 +100,7 @@ public class DriverTestData {
public static final void compareTupleArrays(Object[] expected, Object[] found) {
if (expected.length != found.length) {
- throw new IllegalArgumentException();
+ Assert.assertEquals("Length of result is wrong", expected.length, found.length);
}
for (int i = 0; i < expected.length; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index f872b56..ecc5259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.drivers;
+import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -28,7 +29,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.runtime.util.RegularToMutableObjectIterator;
import org.apache.flink.types.IntValue;
@@ -53,14 +53,18 @@ public class GroupReduceDriverTest {
TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
+ GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+
context.setInput1(input, typeInfo.createSerializer());
context.setComparator1(comparator);
- context.setCollector(new DiscardingOutputCollector<Tuple2<String, Integer>>());
+ context.setCollector(result);
GroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>> driver = new GroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>>();
driver.setup(context);
driver.prepare();
driver.run();
+
+ Assert.assertTrue(result.getList().isEmpty());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -141,7 +145,84 @@ public class GroupReduceDriverTest {
}
}
-
+ @Test
+ public void testAllReduceDriverIncorrectlyAccumulatingMutable() {
+ try {
+ TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+
+ List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
+ TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
+ MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer());
+ TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
+
+ GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer());
+
+ context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
+ context.setInput1(input, typeInfo.createSerializer());
+ context.setComparator1(comparator);
+ context.setCollector(result);
+ context.setUdf(new ConcatSumMutableAccumulatingReducer());
+
+ GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> driver = new GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>();
+ driver.setup(context);
+ driver.prepare();
+ driver.run();
+
+ Object[] res = result.getList().toArray();
+ Object[] expected = DriverTestData.createReduceMutableDataGroupedResult().toArray();
+
+ try {
+ DriverTestData.compareTupleArrays(expected, res);
+ Assert.fail("Accumulationg mutable objects is expected to result in incorrect values.");
+ }
+ catch (AssertionError e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAllReduceDriverAccumulatingImmutable() {
+ try {
+ TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+
+ List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
+ TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
+ MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer());
+ TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
+
+ GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer());
+
+ context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
+ context.setInput1(input, typeInfo.createSerializer());
+ context.setComparator1(comparator);
+ context.setCollector(result);
+ context.setUdf(new ConcatSumMutableAccumulatingReducer());
+ context.setMutableObjectMode(false);
+
+ GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> driver = new GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>();
+ driver.setup(context);
+ driver.prepare();
+ driver.run();
+
+ Object[] res = result.getList().toArray();
+ Object[] expected = DriverTestData.createReduceMutableDataGroupedResult().toArray();
+
+ DriverTestData.compareTupleArrays(expected, res);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
// --------------------------------------------------------------------------------------------
// Test UDFs
@@ -178,4 +259,26 @@ public class GroupReduceDriverTest {
out.collect(current);
}
}
+
+ public static final class ConcatSumMutableAccumulatingReducer implements GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
+
+ @Override
+ public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) throws Exception {
+ List<Tuple2<StringValue, IntValue>> all = new ArrayList<Tuple2<StringValue,IntValue>>();
+
+ for (Tuple2<StringValue, IntValue> t : values) {
+ all.add(t);
+ }
+
+ Tuple2<StringValue, IntValue> result = all.get(0);
+
+ for (int i = 1; i < all.size(); i++) {
+ Tuple2<StringValue, IntValue> e = all.get(i);
+ result.f0.append(e.f0);
+ result.f1.setValue(result.f1.getValue() + e.f1.getValue());
+ }
+
+ out.collect(result);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index f0f51a6..e5ece3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -119,6 +119,10 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
this.config.setDriverStrategy(strategy);
}
+ public void setMutableObjectMode(boolean mutableObjectMode) {
+ this.config.setMutableObjectMode(mutableObjectMode);
+ }
+
// --------------------------------------------------------------------------------------------
// Context Methods
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
new file mode 100644
index 0000000..54a1492
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
+ * over the records with the same key.
+ */
+public class KeyGroupedIteratorImmutableTest {
+
+ private MutableObjectIterator<Record> sourceIter; // the iterator that provides the input
+
+ private KeyGroupedIteratorImmutable<Record> psi; // the grouping iterator, progressing in key steps
+
+ @Before
+ public void setup()
+ {
+ final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
+
+ // add elements to the source
+ source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
+ source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
+ source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
+ source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
+ source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
+ source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
+ source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
+ source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
+
+
+ this.sourceIter = new MutableObjectIterator<Record>() {
+ final Iterator<IntStringPair> it = source.iterator();
+
+ @Override
+ public Record next(Record reuse) throws IOException {
+ if (it.hasNext()) {
+ IntStringPair pair = it.next();
+ reuse.setField(0, pair.getInteger());
+ reuse.setField(1, pair.getString());
+ return reuse;
+ }
+ else {
+ return null;
+ }
+ }
+ };
+
+ final RecordSerializer serializer = RecordSerializer.get();
+ @SuppressWarnings("unchecked")
+ final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
+
+ this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
+ }
+
+ @Test
+ public void testNextKeyOnly() throws Exception
+ {
+ try {
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
+
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testFullIterationThroughAllValues() throws IOException
+ {
+ try {
+ // Key 1, Value A
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 2, Value B
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 3, Values C, D
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 4, Values E, F, G
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Key 5, Values H, I, J, K, L
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+ Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ try {
+ this.psi.getValues().next();
+ Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+ }
+ catch (NoSuchElementException nseex) {}
+
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testMixedProgress() throws Exception
+ {
+ try {
+ // Progression only via nextKey() and hasNext() - Key 1, Value A
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+
+ // Progression only through nextKey() - Key 2, Value B
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+
+ // Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+
+ // Progression first via next() only, then hasNext() only Key 4, Values E, F, G
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+
+ // Key 5, Values H, I, J, K, L
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+
+ // end
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
+ {
+ try {
+ Iterator<Record> valsIter = null;
+ Record rec = null;
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ valsIter = this.psi.getValues();
+ Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ valsIter = this.psi.getValues();
+ Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+
+ Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ valsIter = this.psi.getValues();
+ Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+ Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+ rec = valsIter.next();
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+ Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test encountered an unexpected exception.");
+ }
+ }
+
+ private static final class IntStringPair
+ {
+ private final IntValue integer;
+ private final StringValue string;
+
+ IntStringPair(IntValue integer, StringValue string) {
+ this.integer = integer;
+ this.string = string;
+ }
+
+ public IntValue getInteger() {
+ return integer;
+ }
+
+ public StringValue getString() {
+ return string;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
deleted file mode 100644
index 7d093d5..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.runtime.util;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import eu.stratosphere.api.common.typeutils.TypeComparator;
-import eu.stratosphere.api.common.typeutils.TypeSerializer;
-import eu.stratosphere.util.MutableObjectIterator;
-
-/**
- * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
- *
- */
-public final class KeyGroupedIteratorImmutable<E> {
-
- private final MutableObjectIterator<E> iterator;
-
- private final TypeSerializer<E> serializer;
-
- private final TypeComparator<E> comparator;
-
- private ValuesIterator valuesIterator;
-
- private E lastKeyRecord;
-
- private E lookahead;
-
- private boolean done;
-
- /**
- * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
- * sorted by the key fields.
- *
- * @param iterator An iterator over records, which are sorted by the key fields, in any order.
- * @param serializer The serializer for the data type iterated over.
- * @param comparator The comparator for the data type iterated over.
- */
- public KeyGroupedIteratorImmutable(MutableObjectIterator<E> iterator,
- TypeSerializer<E> serializer, TypeComparator<E> comparator)
- {
- if (iterator == null || serializer == null || comparator == null) {
- throw new NullPointerException();
- }
-
- this.iterator = iterator;
- this.serializer = serializer;
- this.comparator = comparator;
- }
-
- /**
- * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
- * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups.
- *
- * @return true, if the input iterator has an other group of records with the same key.
- */
- public boolean nextKey() throws IOException {
-
- if (lookahead != null) {
- // common case: whole value-iterator was consumed and a new key group is available.
- this.comparator.setReference(this.lookahead);
- this.valuesIterator.next = this.lookahead;
- this.lastKeyRecord = this.lookahead;
- this.lookahead = null;
- return true;
- }
-
- // first element, empty/done, or the values iterator was not entirely consumed
- if (this.done) {
- return false;
- }
-
- if (this.valuesIterator != null) {
- // values was not entirely consumed. move to the next key
- // Required if user code / reduce() method did not read the whole value iterator.
- E next = this.serializer.createInstance();
- while (true) {
- if ((next = this.iterator.next(next)) != null) {
- if (!this.comparator.equalToReference(next)) {
- // the keys do not match, so we have a new group. store the current key
- this.comparator.setReference(next);
- this.valuesIterator.next = next;
- this.lastKeyRecord = next;
- return true;
- }
- }
- else {
- // input exhausted
- this.valuesIterator.next = null;
- this.valuesIterator = null;
- this.lastKeyRecord = null;
- this.done = true;
- return false;
- }
- }
- }
- else {
- // first element
- // get the next element
- E first = this.iterator.next(this.serializer.createInstance());
- if (first != null) {
- this.comparator.setReference(first);
- this.valuesIterator = new ValuesIterator(first);
- this.lastKeyRecord = first;
- return true;
- }
- else {
- // empty input, set everything null
- this.done = true;
- return false;
- }
- }
- }
-
- private E advanceToNext() {
- try {
- E next = this.iterator.next(serializer.createInstance());
- if (next != null) {
- if (comparator.equalToReference(next)) {
- // same key
- return next;
- } else {
- // moved to the next key, no more values here
- this.lookahead = next;
- return null;
- }
- }
- else {
- // backing iterator is consumed
- this.done = true;
- return null;
- }
- }
- catch (IOException e) {
- throw new RuntimeException("An error occurred while reading the next record.", e);
- }
- }
-
- public E getCurrent() {
- return lastKeyRecord;
- }
-
- public TypeComparator<E> getComparatorWithCurrentReference() {
- return this.comparator;
- }
-
- /**
- * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
- * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
- * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
- *
- * @return Iterator over all values that belong to the current key.
- */
- public ValuesIterator getValues() {
- return this.valuesIterator;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public final class ValuesIterator implements Iterator<E> {
-
- private E next;
-
- private ValuesIterator(E first) {
- this.next = first;
- }
-
- @Override
- public boolean hasNext() {
- return next != null;
- }
-
- @Override
- public E next() {
- if (this.next != null) {
- E current = this.next;
- this.next = KeyGroupedIteratorImmutable.this.advanceToNext();
- return current;
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7c4c858/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
deleted file mode 100644
index 4b83422..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.runtime.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
-import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.util.MutableObjectIterator;
-
-/**
- * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
- * over the records with the same key.
- */
-public class KeyGroupedIteratorImmutableTest {
-
- private MutableObjectIterator<Record> sourceIter; // the iterator that provides the input
-
- private KeyGroupedIteratorImmutable<Record> psi; // the grouping iterator, progressing in key steps
-
- @Before
- public void setup()
- {
- final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
-
- // add elements to the source
- source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
- source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
- source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
- source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
- source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
- source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
- source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
- source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
- source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
- source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
- source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
- source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
-
-
- this.sourceIter = new MutableObjectIterator<Record>() {
- final Iterator<IntStringPair> it = source.iterator();
-
- @Override
- public Record next(Record reuse) throws IOException {
- if (it.hasNext()) {
- IntStringPair pair = it.next();
- reuse.setField(0, pair.getInteger());
- reuse.setField(1, pair.getString());
- return reuse;
- }
- else {
- return null;
- }
- }
- };
-
- final RecordSerializer serializer = RecordSerializer.get();
- @SuppressWarnings("unchecked")
- final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
-
- this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
- }
-
- @Test
- public void testNextKeyOnly() throws Exception
- {
- try {
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
-
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test encountered an unexpected exception.");
- }
- }
-
- @Test
- public void testFullIterationThroughAllValues() throws IOException
- {
- try {
- // Key 1, Value A
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- // Key 2, Value B
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- // Key 3, Values C, D
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- try {
- this.psi.getValues().next();
- Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
- }
- catch (NoSuchElementException nseex) {}
- Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
- try {
- this.psi.getValues().next();
- Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
- }
- catch (NoSuchElementException nseex) {}
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- // Key 4, Values E, F, G
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- // Key 5, Values H, I, J, K, L
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- try {
- this.psi.getValues().next();
- Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
- }
- catch (NoSuchElementException nseex) {}
- Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- try {
- this.psi.getValues().next();
- Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
- }
- catch (NoSuchElementException nseex) {}
-
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test encountered an unexpected exception.");
- }
- }
-
- @Test
- public void testMixedProgress() throws Exception
- {
- try {
- // Progression only via nextKey() and hasNext() - Key 1, Value A
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-
- // Progression only through nextKey() - Key 2, Value B
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-
- // Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-
- // Progression first via next() only, then hasNext() only Key 4, Values E, F, G
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-
- // Key 5, Values H, I, J, K, L
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
- Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-
- // end
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test encountered an unexpected exception.");
- }
- }
-
- @Test
- public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
- {
- try {
- Iterator<Record> valsIter = null;
- Record rec = null;
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- valsIter = this.psi.getValues();
- Assert.assertNotNull("Returned Iterator must not be null", valsIter);
- Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
- rec = valsIter.next();
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- valsIter = this.psi.getValues();
- Assert.assertNotNull("Returned Iterator must not be null", valsIter);
- Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
- rec = valsIter.next();
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-
- Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
- valsIter = this.psi.getValues();
- Assert.assertNotNull("Returned Iterator must not be null", valsIter);
- Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
- rec = valsIter.next();
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
- Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
- rec = valsIter.next();
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
- Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
- Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
- Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test encountered an unexpected exception.");
- }
- }
-
- private static final class IntStringPair
- {
- private final IntValue integer;
- private final StringValue string;
-
- IntStringPair(IntValue integer, StringValue string) {
- this.integer = integer;
- this.string = string;
- }
-
- public IntValue getInteger() {
- return integer;
- }
-
- public StringValue getString() {
- return string;
- }
- }
-}