You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:35 UTC
[19/22] [FLINK-1023] Switch group-at-a-time function to
java.lang.Iterable (from java.util.Iterator) Iterables over transient data
throw an TraversableOnceException when iterated over again.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index a325e33..a29d4e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.drivers;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -152,11 +150,10 @@ public class GroupReduceDriverTest {
public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
- public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
- Tuple2<String, Integer> current = values.next();
+ public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
+ Tuple2<String, Integer> current = new Tuple2<String, Integer>("", 0);
- while (values.hasNext()) {
- Tuple2<String, Integer> next = values.next();
+ for (Tuple2<String, Integer> next : values) {
next.f0 = current.f0 + next.f0;
next.f1 = current.f1 + next.f1;
current = next;
@@ -169,11 +166,10 @@ public class GroupReduceDriverTest {
public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
@Override
- public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
- Tuple2<StringValue, IntValue> current = values.next();
+ public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
+ Tuple2<StringValue, IntValue> current = new Tuple2<StringValue, IntValue>(new StringValue(""), new IntValue(0));
- while (values.hasNext()) {
- Tuple2<StringValue, IntValue> next = values.next();
+ for (Tuple2<StringValue, IntValue> next : values) {
next.f0.append(current.f0);
next.f1.setValue(current.f1.getValue() + next.f1.getValue());
current = next;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 7020f89..4b3c197 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
@@ -26,13 +25,12 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
@@ -273,7 +271,7 @@ public class CombiningUnilateralSortMergerITCase {
// --------------------------------------------------------------------------------------------
- public static class TestCountCombiner extends ReduceFunction {
+ public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private final IntValue count = new IntValue();
@@ -284,11 +282,11 @@ public class CombiningUnilateralSortMergerITCase {
@Override
- public void combine(Iterator<Record> values, Collector<Record> out) {
+ public void combine(Iterable<Record> values, Collector<Record> out) {
Record rec = null;
int cnt = 0;
- while (values.hasNext()) {
- rec = values.next();
+ for (Record next : values) {
+ rec = next;
cnt += rec.getField(1, IntValue.class).getValue();
}
@@ -298,7 +296,7 @@ public class CombiningUnilateralSortMergerITCase {
}
@Override
- public void reduce(Iterator<Record> values, Collector<Record> out) {}
+ public void reduce(Iterable<Record> values, Collector<Record> out) {}
@Override
public void open(Configuration parameters) throws Exception {
@@ -311,7 +309,7 @@ public class CombiningUnilateralSortMergerITCase {
}
}
- public static class TestCountCombiner2 extends ReduceFunction {
+ public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
public volatile boolean opened = false;
@@ -319,11 +317,11 @@ public class CombiningUnilateralSortMergerITCase {
public volatile boolean closed = false;
@Override
- public void combine(Iterator<Record> values, Collector<Record> out) {
+ public void combine(Iterable<Record> values, Collector<Record> out) {
Record rec = null;
int cnt = 0;
- while (values.hasNext()) {
- rec = values.next();
+ for (Record next : values) {
+ rec = next;
cnt += Integer.parseInt(rec.getField(1, TestData.Value.class).toString());
}
@@ -331,7 +329,7 @@ public class CombiningUnilateralSortMergerITCase {
}
@Override
- public void reduce(Iterator<Record> values, Collector<Record> out) {
+ public void reduce(Iterable<Record> values, Collector<Record> out) {
// yo, nothing, mon
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
index a66fb5d..48a4b91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
@@ -116,8 +116,8 @@ public class SortMergeCoGroupIteratorITCase
final TestData.Key key = new TestData.Key();
while (iterator.next())
{
- Iterator<Record> iter1 = iterator.getValues1();
- Iterator<Record> iter2 = iterator.getValues2();
+ Iterator<Record> iter1 = iterator.getValues1().iterator();
+ Iterator<Record> iter2 = iterator.getValues2().iterator();
TestData.Value v1 = null;
TestData.Value v2 = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
index e9010d2..27fa540 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -38,17 +39,15 @@ import org.junit.Test;
/**
* Test for the key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
* over the records with the same key.
- *
*/
-public class KeyGroupedIteratorTest
-{
+public class KeyGroupedIteratorTest {
+
private MutableObjectIterator<Record> sourceIter; // the iterator that provides the input
private KeyGroupedIterator<Record> psi; // the grouping iterator, progressing in key steps
@Before
- public void setup()
- {
+ public void setup() {
final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
// add elements to the source
@@ -91,8 +90,7 @@ public class KeyGroupedIteratorTest
}
@Test
- public void testNextKeyOnly() throws Exception
- {
+ public void testNextKeyOnly() throws Exception {
try {
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
@@ -131,6 +129,8 @@ public class KeyGroupedIteratorTest
try {
// Key 1, Value A
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -139,6 +139,8 @@ public class KeyGroupedIteratorTest
// Key 2, Value B
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -147,6 +149,8 @@ public class KeyGroupedIteratorTest
// Key 3, Values C, D
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -173,6 +177,8 @@ public class KeyGroupedIteratorTest
// Key 4, Values E, F, G
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -193,6 +199,8 @@ public class KeyGroupedIteratorTest
// Key 5, Values H, I, J, K, L
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -231,6 +239,7 @@ public class KeyGroupedIteratorTest
Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+ Assert.assertNull(this.psi.getValues());
} catch (Exception e) {
e.printStackTrace();
Assert.fail("The test encountered an unexpected exception.");
@@ -244,12 +253,18 @@ public class KeyGroupedIteratorTest
// Progression only via nextKey() and hasNext() - Key 1, Value A
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
// Progression only through nextKey() - Key 2, Value B
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
@@ -260,6 +275,8 @@ public class KeyGroupedIteratorTest
// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
@@ -270,6 +287,8 @@ public class KeyGroupedIteratorTest
Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+ Assert.assertTrue(hasIterator(this.psi.getValues()));
+ Assert.assertFalse(hasIterator(this.psi.getValues()));
Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
// end
@@ -355,4 +374,14 @@ public class KeyGroupedIteratorTest
return string;
}
}
+
+ public boolean hasIterator(Iterable<?> iterable) {
+ try {
+ iterable.iterator();
+ return true;
+ }
+ catch (TraversableOnceException e) {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
index 6ea8cbf..de7b7d1 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala
@@ -19,10 +19,10 @@
package org.apache.flink.api.scala.functions
-import java.util.{ Iterator => JIterator }
-
import scala.Iterator
+import java.util.{Iterator => JIterator}
+
import org.apache.flink.api.scala.analysis.{UDTSerializer, FieldSelector, UDT}
import org.apache.flink.api.scala.analysis.UDF1
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
index 4c26307..6a71bb7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.scala.operators
import language.experimental.macros
@@ -54,7 +53,7 @@ class CoGroupDataSetWithWhereAndEqual[LeftIn, RightIn](val leftKeySelection: Lis
def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CoGroupMacros.flatMap[LeftIn, RightIn, Out]
}
-class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends CoGroupOperator.Builder(new UserCodeObjectWrapper(s))
+class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends CoGroupOperator.Builder(new UserCodeObjectWrapper(new CoGroupOperator.WrappingCoGroupFunction(s)))
object CoGroupMacros {
@@ -106,7 +105,9 @@ object CoGroupMacros {
implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+
new CoGroupFunctionBase[LeftIn, RightIn, Out] {
+
override def coGroup(leftRecords: JIterator[Record], rightRecords: JIterator[Record], out: Collector[Record]) = {
val firstLeftRecord = leftIterator.initialize(leftRecords)
@@ -177,7 +178,9 @@ object CoGroupMacros {
implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
+
new CoGroupFunctionBase[LeftIn, RightIn, Out] {
+
override def coGroup(leftRecords: JIterator[Record], rightRecords: JIterator[Record], out: Collector[Record]) = {
val firstLeftRecord = leftIterator.initialize(leftRecords)
val firstRightRecord = rightIterator.initialize(rightRecords)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
index cf3a96c..f25b5a3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.scala.operators
import language.experimental.macros
@@ -191,7 +190,9 @@ object ReduceMacros {
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
new ReduceFunctionBase[In, Out] {
- override def reduce(records: JIterator[Record], out: Collector[Record]) = {
+ override def reduce(recordsIterable: JIterator[Record], out: Collector[Record]) = {
+ val records: JIterator[Record] = recordsIterable
+
if (records.hasNext) {
val firstRecord = reduceIterator.initialize(records)
reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index e76f86d..5497004 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -270,6 +270,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
private void reduceInternal(Iterator<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
+
while (records.hasNext()) {
element = records.next();
IntValue i = element.getField(1, IntValue.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index bf6c37d..197a745 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -124,11 +124,13 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase {
}
@Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
+ public void reduce(Iterator<Record> records, Collector<Record> out) {
// Compute the sum
int sum = 0;
- while (it.hasNext()) {
- Integer value = Integer.parseInt(it.next().getField(0, StringValue.class).getValue());
+
+ while (records.hasNext()) {
+ Record r = records.next();
+ Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue());
sum += value;
testCounter.add(value);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 4b3a4bf..2217679 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.record.io.CsvInputFormat;
+import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
@@ -252,7 +253,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
tailConfig.setOutputSerializer(outputSerializer);
// the udf
- tailConfig.setStubWrapper(new UserCodeObjectWrapper<RecomputeClusterCenter>(new RecomputeClusterCenter()));
+ tailConfig.setStubWrapper(new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new RecomputeClusterCenter())));
return tail;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index 35aea4b..b442b33 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -16,12 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
@@ -72,15 +70,13 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
}
@Override
- public void reduce(Iterator<Integer> records, Collector<Integer> out) {
+ public void reduce(Iterable<Integer> records, Collector<Integer> out) {
if (bcValue == null) {
return;
}
final int x = bcValue;
- while (records.hasNext()) {
- int y = records.next();
-
+ for (Integer y : records) {
if (y > x) {
out.collect(y);
return;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 37695b2..f2a43a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.io.BufferedReader;
@@ -104,7 +103,8 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
long minimumComponentID = Long.MAX_VALUE;
while (candidates.hasNext()) {
- long candidateComponentID = candidates.next().getField(1, LongValue.class).getValue();
+ Record candidate = candidates.next();
+ long candidateComponentID = candidate.getField(1, LongValue.class).getValue();
if (candidateComponentID < minimumComponentID) {
minimumComponentID = candidateComponentID;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 49bdb26..c6175b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.functions.RichCoGroupFunction;
@@ -116,17 +114,17 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
- public void coGroup(Iterator<Tuple2<Long, Long>> candidates, Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
- if (!current.hasNext()) {
+ public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
+ if (!current.iterator().hasNext()) {
throw new RuntimeException("Error: Id not encountered before.");
}
- Tuple2<Long, Long> old = current.next();
+ Tuple2<Long, Long> old = current.iterator().next();
long minimumComponentID = Long.MAX_VALUE;
- while (candidates.hasNext()) {
- long candidateComponentID = candidates.next().f1;
+ for (Tuple2<Long, Long> candidate : candidates) {
+ long candidateComponentID = candidate.f1;
if (candidateComponentID < minimumComponentID) {
minimumComponentID = candidateComponentID;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index d2669aa..fdfb321 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
@@ -170,8 +168,8 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Long> values, Collector<Long> out) throws Exception {
- out.collect(values.next());
+ public void reduce(Iterable<Long> values, Collector<Long> out) {
+ out.collect(values.iterator().next());
}
}
@@ -204,21 +202,19 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@Override
- public void reduce(Iterator<Tuple2<Long, Long>> values,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+ Long vertexId = 0L;
+ Long minimumCompId = Long.MAX_VALUE;
- final Tuple2<Long, Long> first = values.next();
- final Long vertexId = first.f0;
- Long minimumCompId = first.f1;
-
- while ( values.hasNext() ) {
- Long candidateCompId = values.next().f1;
- if ( candidateCompId < minimumCompId ) {
+ for (Tuple2<Long, Long> value: values) {
+ vertexId = value.f0;
+ Long candidateCompId = value.f1;
+ if (candidateCompId < minimumCompId) {
minimumCompId = candidateCompId;
}
}
- resultVertex.setField(vertexId, 0);
- resultVertex.setField(minimumCompId, 1);
+ resultVertex.f0 = vertexId;
+ resultVertex.f1 = minimumCompId;
out.collect(resultVertex);
}
@@ -231,8 +227,8 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
@Override
public void flatMap(
Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Collector<Tuple2<Long, Long>> out)
+ {
if ( vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1 ) {
out.collect(vertexWithNewAndOldId.f0);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index cb25019..3dc0bdf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index 6a487c4..2c777f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.io.Serializable;
@@ -110,6 +109,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
public void reduce(Iterator<Record> it, Collector<Record> out) {
// Compute the sum
int sum = 0;
+
while (it.hasNext()) {
sum += Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index c81b32a..b3145dc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index b62d85a..d00602f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -44,85 +44,81 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class IterationWithChainingITCase extends RecordAPITestBase {
- private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
+ private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
- private String dataPath;
- private String resultPath;
+ private String dataPath;
+ private String resultPath;
- public IterationWithChainingITCase(Configuration config) {
- super(config);
+ public IterationWithChainingITCase(Configuration config) {
+ super(config);
setTaskManagerNumSlots(DOP);
- }
+ }
- @Override
- protected void preSubmit() throws Exception {
- dataPath = createTempFile("data_points.txt", DATA_POINTS);
- resultPath = getTempFilePath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ dataPath = createTempFile("data_points.txt", DATA_POINTS);
+ resultPath = getTempFilePath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(DATA_POINTS, resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(DATA_POINTS, resultPath);
+ }
+ @Override
+ protected Plan getTestJob() {
+ Plan plan = getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath);
+ return plan;
+ }
- @Override
- protected Plan getTestJob() {
- Plan plan = getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath);
- return plan;
- }
+ @Parameters
+ public static Collection<Object[]> getConfigurations() {
+ Configuration config1 = new Configuration();
+ config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
+ return toParameterList(config1);
+ }
- @Parameters
- public static Collection<Object[]> getConfigurations() {
- Configuration config1 = new Configuration();
- config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
- return toParameterList(config1);
- }
+ public static final class IdentityMapper extends MapFunction implements Serializable {
- public static final class IdentityMapper extends MapFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
- private static final long serialVersionUID = 1L;
+ @Override
+ public void map(Record rec, Collector<Record> out) {
+ out.collect(rec);
+ }
+ }
- @Override
- public void map(Record rec, Collector<Record> out) {
- out.collect(rec);
- }
- }
+ public static final class DummyReducer extends ReduceFunction implements Serializable {
- public static final class DummyReducer extends ReduceFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
- private static final long serialVersionUID = 1L;
+ @Override
+ public void reduce(Iterator<Record> it, Collector<Record> out) {
+ while (it.hasNext()) {
+ out.collect(it.next());
+ }
+ }
+ }
- @Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
- while (it.hasNext()) {
- out.collect(it.next());
- }
- }
- }
+ static Plan getTestPlan(int numSubTasks, String input, String output) {
- static Plan getTestPlan(int numSubTasks, String input, String output) {
+ FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
+ initialInput.setDegreeOfParallelism(1);
- FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
- initialInput.setDegreeOfParallelism(1);
+ BulkIteration iteration = new BulkIteration("Loop");
+ iteration.setInput(initialInput);
+ iteration.setMaximumNumberOfIterations(2);
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(initialInput);
- iteration.setMaximumNumberOfIterations(2);
+ ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution())
+ .name("Reduce something").build();
- ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0)
- .input(iteration.getPartialSolution())
- .name("Reduce something")
- .build();
+ MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build();
+ iteration.setNextPartialSolution(dummyMap);
+ FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
- MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build();
- iteration.setNextPartialSolution(dummyMap);
-
- FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
-
- Plan plan = new Plan(finalResult, "Iteration with chained map test");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
+ Plan plan = new Plan(finalResult, "Iteration with chained map test");
+ plan.setDefaultParallelism(numSubTasks);
+ return plan;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index aa83e9b..2a130ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -16,43 +16,28 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.IterativeDataSet;
import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-@RunWith(Parameterized.class)
-public class IterationWithUnionITCase extends RecordAPITestBase {
+public class IterationWithUnionITCase extends JavaProgramTestBase {
private static final String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
protected String dataPath;
protected String resultPath;
-
- public IterationWithUnionITCase(Configuration config) {
- super(config);
- setTaskManagerNumSlots(DOP);
- }
@Override
protected void preSubmit() throws Exception {
@@ -66,54 +51,36 @@ public class IterationWithUnionITCase extends RecordAPITestBase {
}
@Override
- protected Plan getTestJob() {
- return getPlan(config.getInteger("IterationWithUnionITCase#NumSubtasks", 1), dataPath, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() {
- Configuration config1 = new Configuration();
- config1.setInteger("IterationWithUnionITCase#NumSubtasks", DOP);
-
- return toParameterList(config1);
- }
-
- private static Plan getPlan(int numSubTasks, String input, String output) {
- FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
- initialInput.setDegreeOfParallelism(1);
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(initialInput);
- iteration.setMaximumNumberOfIterations(2);
-
- @SuppressWarnings("unchecked")
- MapOperator map2 = MapOperator.builder(new IdentityMapper()).input(iteration.getPartialSolution(), iteration.getPartialSolution()).name("map").build();
+ DataSet<Record> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
- iteration.setNextPartialSolution(map2);
-
- FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
-
- Plan plan = new Plan(finalResult, "Iteration with union test");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
+ IterativeDataSet<Record> iteration = initialInput.iterate(2);
+
+ DataSet<Record> result = iteration.union(iteration).map(new IdentityMapper());
+
+ iteration.closeWith(result).write(new PointOutFormat(), this.resultPath);
+
+ env.execute();
}
- static final class IdentityMapper extends MapFunction implements Serializable {
+ static final class IdentityMapper implements MapFunction<Record, Record>, Serializable {
private static final long serialVersionUID = 1L;
@Override
- public void map(Record rec, Collector<Record> out) {
- out.collect(rec);
+ public Record map(Record rec) {
+ return rec;
}
}
- static final class DummyReducer extends ReduceFunction implements Serializable {
+ static final class DummyReducer implements GroupReduceFunction<Record, Record>, Serializable {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
- while (it.hasNext()) {
- out.collect(it.next());
+ public void reduce(Iterable<Record> it, Collector<Record> out) {
+ for (Record r : it) {
+ out.collect(r);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index 4c8177a..9382708 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -16,34 +16,27 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative.aggregators;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
-
import org.junit.Assert;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
-
/**
- *
- * Connected Components test case that uses a parametrizable aggregator
- *
+ * Connected Components test case that uses a parameterizable aggregator
*/
public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaProgramTestBase {
@@ -147,7 +140,8 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
}
}
- public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class NeighborWithComponentIDJoin implements JoinFunction
+ <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -163,24 +157,23 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
- final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
+
+ private final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@Override
- public void reduce(Iterator<Tuple2<Long, Long>> values,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- final Tuple2<Long, Long> first = values.next();
- final Long vertexId = first.f0;
- Long minimumCompId = first.f1;
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+ Long vertexId = 0L;
+ Long minimumCompId = Long.MAX_VALUE;
- while (values.hasNext()) {
- Long candidateCompId = values.next().f1;
+ for (Tuple2<Long, Long> value: values) {
+ vertexId = value.f0;
+ Long candidateCompId = value.f1;
if (candidateCompId < minimumCompId) {
minimumCompId = candidateCompId;
}
}
- resultVertex.setField(vertexId, 0);
- resultVertex.setField(minimumCompId, 1);
+ resultVertex.f0 = vertexId;
+ resultVertex.f1 = minimumCompId;
out.collect(resultVertex);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index 104c3df..039d64e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative.aggregators;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -158,21 +156,19 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@Override
- public void reduce(Iterator<Tuple2<Long, Long>> values,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
- final Tuple2<Long, Long> first = values.next();
- final Long vertexId = first.f0;
- Long minimumCompId = first.f1;
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+ Long vertexId = 0L;
+ Long minimumCompId = Long.MAX_VALUE;
- while (values.hasNext()) {
- Long candidateCompId = values.next().f1;
+ for (Tuple2<Long, Long> value: values) {
+ vertexId = value.f0;
+ Long candidateCompId = value.f1;
if (candidateCompId < minimumCompId) {
minimumCompId = candidateCompId;
}
}
- resultVertex.setField(vertexId, 0);
- resultVertex.setField(minimumCompId, 1);
+ resultVertex.f0 = vertexId;
+ resultVertex.f1 = minimumCompId;
out.collect(resultVertex);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 3060b68..4fd22a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -31,6 +32,8 @@ import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.FileOutputFormat;
+import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
+import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingClassReduceFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
@@ -317,7 +320,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
intermediateConfig.setDriverComparator(comparator, 0);
intermediateConfig.setStubWrapper(
- new UserCodeClassWrapper<MinimumComponentIDReduce>(MinimumComponentIDReduce.class));
+ new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingClassReduceFunction(MinimumComponentIDReduce.class)));
}
return intermediate;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 448b7bd..5a6e4f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -19,13 +19,12 @@
package org.apache.flink.test.iterative.nephele;
import java.util.Collection;
-import java.util.Iterator;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.FileOutputFormat;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
@@ -273,14 +272,14 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
}
}
- public static final class DummyReducer extends ReduceFunction {
+ public static final class DummyReducer implements GroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
- while (it.hasNext()) {
- out.collect(it.next());
+ public void reduce(Iterable<Record> it, Collector<Record> out) {
+ for (Record r :it) {
+ out.collect(r);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
index 1ec0eb4..cd6a89d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
import java.util.Iterator;
@@ -83,11 +82,13 @@ public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction i
}
@Override
- public void coGroup(Iterator<VertexWithRankAndDangling> currentPageRankIterator, Iterator<VertexWithRank> partialRanks,
+ public void coGroup(Iterable<VertexWithRankAndDangling> currentPageRankIterable, Iterable<VertexWithRank> partialRanks,
Collector<VertexWithRankAndDangling> collector)
{
+ final Iterator<VertexWithRankAndDangling> currentPageRankIterator = currentPageRankIterable.iterator();
+
if (!currentPageRankIterator.hasNext()) {
- long missingVertex = partialRanks.next().getVertexID();
+ long missingVertex = partialRanks.iterator().next().getVertexID();
throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
}
@@ -95,8 +96,8 @@ public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction i
long edges = 0;
double summedRank = 0;
- while (partialRanks.hasNext()) {
- summedRank += partialRanks.next().getRank();
+ for (VertexWithRank pr :partialRanks) {
+ summedRank += pr.getRank();
edges++;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
index d83b33b..8d92c59 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
@@ -29,6 +29,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
import org.apache.flink.util.Collector;
+@SuppressWarnings("deprecation")
public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 1e08a9f..8ec8403 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -35,12 +35,14 @@ public class CustomRankCombiner extends AbstractRichFunction implements GroupRed
private final VertexWithRank accumulator = new VertexWithRank();
@Override
- public void reduce(Iterator<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
+ public void reduce(Iterable<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
throw new UnsupportedOperationException();
}
@Override
- public void combine(Iterator<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
+ public void combine(Iterable<VertexWithRank> recordsIterable, Collector<VertexWithRank> out) throws Exception {
+ final Iterator<VertexWithRank> records = recordsIterable.iterator();
+
VertexWithRank next = records.next();
this.accumulator.setVertexID(next.getVertexID());
double rank = next.getRank();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
index ab3dea9..c51cf96 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.iterative.nephele.danglingpagerank;
import java.util.Iterator;
@@ -102,7 +101,8 @@ public class CompensatableDotProductCoGroup extends CoGroupFunction {
long edges = 0;
double summedRank = 0;
while (partialRanks.hasNext()) {
- summedRank += partialRanks.next().getField(1, doubleInstance).getValue();
+ Record pr = partialRanks.next();
+ summedRank += pr.getField(1, doubleInstance).getValue();
edges++;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 3749c1d..d59d721 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.javaApiOperators;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -308,21 +307,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
@Override
public void coGroup(
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
- Collector<Tuple2<Integer, Integer>> out) throws Exception {
-
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<Tuple2<Integer, Integer>> out)
+ {
int sum = 0;
int id = 0;
- while(first.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = first.next();
+ for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) {
sum += element.f2;
id = element.f0;
}
- while(second.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = second.next();
+ for ( Tuple5<Integer, Long, Integer, String, Long> element : second ) {
sum += element.f2;
id = element.f0;
}
@@ -336,27 +333,22 @@ public class CoGroupITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void coGroup(Iterator<CustomType> first,
- Iterator<CustomType> second, Collector<CustomType> out)
- throws Exception {
+ public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) {
CustomType o = new CustomType(0,0,"test");
- while(first.hasNext()) {
- CustomType element = first.next();
+ for ( CustomType element : first ) {
o.myInt = element.myInt;
o.myLong += element.myLong;
}
- while(second.hasNext()) {
- CustomType element = second.next();
+ for ( CustomType element : second ) {
o.myInt = element.myInt;
o.myLong += element.myLong;
}
out.collect(o);
}
-
}
public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
@@ -365,21 +357,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
@Override
public void coGroup(
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
- Iterator<CustomType> second,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<CustomType> second,
Collector<Tuple3<Integer, Long, String>> out) throws Exception {
long sum = 0;
int id = 0;
- while(first.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = first.next();
+ for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) {
sum += element.f0;
id = element.f2;
}
- while(second.hasNext()) {
- CustomType element = second.next();
+ for (CustomType element : second) {
id = element.myInt;
sum += element.myLong;
}
@@ -394,20 +384,18 @@ public class CoGroupITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void coGroup(Iterator<CustomType> first,
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
- Collector<CustomType> out) throws Exception {
-
+ public void coGroup(Iterable<CustomType> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<CustomType> out)
+ {
CustomType o = new CustomType(0,0,"test");
- while(first.hasNext()) {
- CustomType element = first.next();
+ for (CustomType element : first) {
o.myInt = element.myInt;
o.myLong += element.myLong;
}
- while(second.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = second.next();
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
o.myInt = element.f2;
o.myLong += element.f0;
}
@@ -423,14 +411,14 @@ public class CoGroupITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void coGroup(Iterator<Tuple3<Integer, Long, String>> first,
- Iterator<Tuple3<Integer, Long, String>> second,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-
- while(first.hasNext()) {
- Tuple3<Integer, Long, String> element = first.next();
- if(element.f0 < 6)
+ public void coGroup(Iterable<Tuple3<Integer, Long, String>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out)
+ {
+ for (Tuple3<Integer, Long, String> element : first) {
+ if(element.f0 < 6) {
out.collect(element);
+ }
}
}
}
@@ -441,20 +429,16 @@ public class CoGroupITCase extends JavaProgramTestBase {
@Override
public void coGroup(
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
- throws Exception {
-
- while(second.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = second.next();
- if(element.f0 < 4)
+ {
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+ if(element.f0 < 4) {
out.collect(element);
+ }
}
-
}
-
-
}
public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
@@ -477,21 +461,19 @@ public class CoGroupITCase extends JavaProgramTestBase {
@Override
public void coGroup(
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> first,
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> second,
- Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
-
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<Tuple3<Integer, Integer, Integer>> out)
+ {
int sum = 0;
int id = 0;
- while(first.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = first.next();
+ for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
sum += element.f2;
id = element.f0;
}
- while(second.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> element = second.next();
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
sum += element.f2;
id = element.f0;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 7376e86..bd10c5e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -422,16 +422,13 @@ public class GroupReduceITCase extends JavaProgramTestBase {
public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
-
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
- Collector<Tuple2<Integer, Long>> out) throws Exception {
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) {
int i = 0;
long l = 0l;
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for (Tuple3<Integer, Long, String> t : values) {
i += t.f0;
l = t.f1;
}
@@ -446,24 +443,22 @@ public class GroupReduceITCase extends JavaProgramTestBase {
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-
- Tuple3<Integer, Long, String> t = values.next();
-
- int sum = t.f0;
- long key = t.f1;
- String concat = t.f2;
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
+ int sum = 0;
+ long key = 0;
+ StringBuilder concat = new StringBuilder();
- while(values.hasNext()) {
- t = values.next();
-
- sum += t.f0;
- concat += "-"+t.f2;
+ for (Tuple3<Integer, Long, String> next : values) {
+ sum += next.f0;
+ key = next.f1;
+ concat.append(next.f2).append("-");
}
- out.collect(new Tuple3<Integer, Long, String>(sum, key, concat));
+ if (concat.length() > 0) {
+ concat.setLength(concat.length() - 1);
+ }
+ out.collect(new Tuple3<Integer, Long, String>(sum, key, concat.toString()));
}
}
@@ -472,16 +467,14 @@ public class GroupReduceITCase extends JavaProgramTestBase {
@Override
public void reduce(
- Iterator<Tuple5<Integer, Long, Integer, String, Long>> values,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> values,
Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
- throws Exception {
-
+ {
int i = 0;
long l = 0l;
long l2 = 0l;
- while(values.hasNext()) {
- Tuple5<Integer, Long, Integer, String, Long> t = values.next();
+ for ( Tuple5<Integer, Long, Integer, String, Long> t : values ) {
i = t.f0;
l += t.f1;
l2 = t.f4;
@@ -496,20 +489,19 @@ public class GroupReduceITCase extends JavaProgramTestBase {
@Override
- public void reduce(Iterator<CustomType> values,
- Collector<CustomType> out) throws Exception {
+ public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
+ final Iterator<CustomType> iter = values.iterator();
CustomType o = new CustomType();
- CustomType c = values.next();
+ CustomType c = iter.next();
o.myString = "Hello!";
o.myInt = c.myInt;
o.myLong = c.myLong;
- while(values.hasNext()) {
- c = values.next();
- o.myLong += c.myLong;
-
+ while (iter.hasNext()) {
+ CustomType next = iter.next();
+ o.myLong += next.myLong;
}
out.collect(o);
@@ -522,11 +514,9 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
if(t.f0 < 4) {
t.f2 = "Hi!";
@@ -544,14 +534,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
int i = 0;
long l = 0l;
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
i += t.f0;
l += t.f1;
}
@@ -564,21 +552,13 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<CustomType> values,
- Collector<CustomType> out) throws Exception {
+ public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
- CustomType o = new CustomType();
- CustomType c = values.next();
-
- o.myString = "Hello!";
- o.myInt = c.myInt;
- o.myLong = c.myLong;
-
+ CustomType o = new CustomType(0, 0, "Hello!");
- while(values.hasNext()) {
- c = values.next();
- o.myInt += c.myInt;
- o.myLong += c.myLong;
+ for (CustomType next : values) {
+ o.myInt += next.myInt;
+ o.myLong += next.myLong;
}
out.collect(o);
@@ -602,14 +582,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
int i = 0;
long l = 0l;
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
i += t.f0;
l = t.f1;
}
@@ -624,12 +602,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, "");
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
o.f0 += t.f0;
o.f1 = t.f1;
o.f2 = "test"+o.f1;
@@ -639,14 +616,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values,
- Collector<Tuple2<Integer, String>> out) throws Exception {
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
int i = 0;
String s = "";
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
i += t.f0;
s = t.f2;
}
@@ -661,12 +636,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
+ public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, "");
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
o.f0 += t.f0;
o.f1 += t.f1;
o.f2 += "test";
@@ -676,13 +650,12 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
@Override
- public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
+ public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
int i = 0;
String s = "";
- while(values.hasNext()) {
- Tuple3<Integer, Long, String> t = values.next();
+ for ( Tuple3<Integer, Long, String> t : values ) {
i += t.f0 + t.f1;
s += t.f2;
}
@@ -697,12 +670,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static final long serialVersionUID = 1L;
@Override
- public void combine(Iterator<CustomType> values, Collector<CustomType> out) throws Exception {
+ public void combine(Iterable<CustomType> values, Collector<CustomType> out) throws Exception {
CustomType o = new CustomType();
- while(values.hasNext()) {
- CustomType c = values.next();
+ for ( CustomType c : values ) {
o.myInt = c.myInt;
o.myLong += c.myLong;
o.myString = "test"+c.myInt;
@@ -712,13 +684,11 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
@Override
- public void reduce(Iterator<CustomType> values,
- Collector<CustomType> out) throws Exception {
+ public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
CustomType o = new CustomType(0, 0, "");
- while(values.hasNext()) {
- CustomType c = values.next();
+ for ( CustomType c : values) {
o.myInt = c.myInt;
o.myLong += c.myLong;
o.myString = c.myString;
@@ -733,6 +703,5 @@ public class GroupReduceITCase extends JavaProgramTestBase {
@Override
public T map(T value) { return value; }
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 5a895d3..865c550 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.operators;
import java.io.FileNotFoundException;
@@ -26,8 +25,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -46,13 +43,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-/**
- */
@RunWith(Parameterized.class)
public class CoGroupITCase extends RecordAPITestBase {
- private static final Log LOG = LogFactory.getLog(CoGroupITCase.class);
-
String leftInPath = null;
String rightInPath = null;
String resultPath = null;
@@ -89,8 +82,6 @@ public class CoGroupITCase extends RecordAPITestBase {
target.setField(0, keyString);
target.setField(1, valueString);
- LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]");
-
return target;
}
@@ -112,9 +103,6 @@ public class CoGroupITCase extends RecordAPITestBase {
this.buffer.append('\n');
byte[] bytes = this.buffer.toString().getBytes();
-
- LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]");
-
this.stream.write(bytes);
}
}
@@ -124,33 +112,28 @@ public class CoGroupITCase extends RecordAPITestBase {
private StringValue keyString = new StringValue();
private StringValue valueString = new StringValue();
- private Record record = new Record();
@Override
public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
- // TODO Auto-generated method stub
+ Record record = null;
int sum = 0;
- LOG.debug("Start iterating over input1");
+
while (records1.hasNext()) {
record = records1.next();
keyString = record.getField(0, keyString);
valueString = record.getField(1, valueString);
sum += Integer.parseInt(valueString.getValue());
-
- LOG.debug("Processed: [" + keyString.getValue() + "," + valueString.getValue() + "]");
}
- LOG.debug("Start iterating over input2");
+
+
while (records2.hasNext()) {
record = records2.next();
keyString = record.getField(0, keyString);
valueString = record.getField(1, valueString);
sum -= Integer.parseInt(valueString.getValue());
-
- LOG.debug("Processed: [" + keyString.getValue() + "," + valueString.getValue() + "]");
}
record.setField(1, new IntValue(sum));
- LOG.debug("Finished");
out.collect(record);
}
@@ -197,9 +180,9 @@ public class CoGroupITCase extends RecordAPITestBase {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
- String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE};
+ String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
- String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, };
+ String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
for (String localStrategy : localStrategies) {
for (String shipStrategy : shipStrategies) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index b06cf47..2711417 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.test.operators;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -53,8 +50,6 @@ import java.util.LinkedList;
@RunWith(Parameterized.class)
public class ReduceITCase extends RecordAPITestBase {
-
- private static final Log LOG = LogFactory.getLog(ReduceITCase.class);
String inPath = null;
String resultPath = null;
@@ -83,17 +78,14 @@ public class ReduceITCase extends RecordAPITestBase {
private StringValue combineValue = new StringValue();
@Override
- public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-
+ public void combine(Iterator<Record> records, Collector<Record> out) {
+ Record record = null;
int sum = 0;
- Record record = new Record();
+
while (records.hasNext()) {
record = records.next();
combineValue = record.getField(1, combineValue);
sum += Integer.parseInt(combineValue.toString());
-
- LOG.debug("Processed: [" + record.getField(0, StringValue.class).toString() +
- "," + combineValue.toString() + "]");
}
combineValue.setValue(sum + "");
record.setField(1, combineValue);
@@ -101,17 +93,14 @@ public class ReduceITCase extends RecordAPITestBase {
}
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-
+ public void reduce(Iterator<Record> records, Collector<Record> out) {
+ Record record = null;
int sum = 0;
- Record record = new Record();
+
while (records.hasNext()) {
record = records.next();
reduceValue = record.getField(1, reduceValue);
sum += Integer.parseInt(reduceValue.toString());
-
- LOG.debug("Processed: [" + record.getField(0, StringValue.class).toString() +
- "," + reduceValue.toString() + "]");
}
record.setField(1, new IntValue(sum));
out.collect(record);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
index 60b7512..0b84309 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.recordJobTests;
import java.io.Serializable;