You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:40:00 UTC
[29/39] [FLINK-1023] Switch group-at-a-time function to
java.lang.Iterable (from java.util.Iterator) Iterables over transient data
throw an TraversableOnceException when iterated over again.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index 3860fbe..074a70d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -16,9 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.operators;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@@ -29,6 +31,8 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.RecordOperator;
@@ -36,17 +40,20 @@ import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
/**
* ReduceOperator evaluating a {@link ReduceFunction} over each group of records that share the same key.
*
* @see ReduceFunction
*/
-public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, ReduceFunction> implements RecordOperator {
+public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, GroupReduceFunction<Record, Record>> implements RecordOperator {
private static final String DEFAULT_NAME = "<Unnamed Reducer>"; // the default name for contracts
@@ -55,6 +62,8 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
*/
private final Class<? extends Key<?>>[] keyTypes;
+ private final UserCodeWrapper<ReduceFunction> originalFunction;
+
// --------------------------------------------------------------------------------------------
/**
@@ -63,7 +72,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
* @param udf The {@link ReduceFunction} implementation for this Reduce contract.
*/
public static Builder builder(ReduceFunction udf) {
- return new Builder(new UserCodeObjectWrapper<ReduceFunction>(udf));
+ UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+ new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
+
+ return new Builder(original, wrapped);
}
/**
@@ -74,7 +87,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
* @param keyColumn The position of the key.
*/
public static Builder builder(ReduceFunction udf, Class<? extends Key<?>> keyClass, int keyColumn) {
- return new Builder(new UserCodeObjectWrapper<ReduceFunction>(udf), keyClass, keyColumn);
+ UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+ new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
+
+ return new Builder(original, wrapped, keyClass, keyColumn);
}
/**
@@ -83,7 +100,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
* @param udf The {@link ReduceFunction} implementation for this Reduce contract.
*/
public static Builder builder(Class<? extends ReduceFunction> udf) {
- return new Builder(new UserCodeClassWrapper<ReduceFunction>(udf));
+ UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+ new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
+
+ return new Builder(original, wrapped);
}
/**
@@ -94,7 +115,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
* @param keyColumn The position of the key.
*/
public static Builder builder(Class<? extends ReduceFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn) {
- return new Builder(new UserCodeClassWrapper<ReduceFunction>(udf), keyClass, keyColumn);
+ UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
+ new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
+
+ return new Builder(original, wrapped, keyClass, keyColumn);
}
/**
@@ -102,8 +127,10 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
* @param builder
*/
protected ReduceOperator(Builder builder) {
- super(builder.udf, OperatorInfoHelper.unary(), builder.getKeyColumnsArray(), builder.name);
+ super(builder.udfWrapper, OperatorInfoHelper.unary(), builder.getKeyColumnsArray(), builder.name);
+
this.keyTypes = builder.getKeyClassesArray();
+ this.originalFunction = builder.originalUdf;
if (builder.inputs != null && !builder.inputs.isEmpty()) {
setInput(Operator.createUnionCascade(builder.inputs));
@@ -111,7 +138,8 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
setGroupOrder(builder.secondaryOrder);
setBroadcastVariables(builder.broadcastInputs);
- setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+
+ setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(originalFunction));
}
// --------------------------------------------------------------------------------------------
@@ -126,7 +154,7 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
@Override
public boolean isCombinable() {
- return super.isCombinable() || getUserCodeWrapper().getUserCodeAnnotation(Combinable.class) != null;
+ return super.isCombinable() || originalFunction.getUserCodeAnnotation(Combinable.class) != null;
}
/**
@@ -178,7 +206,8 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
public static class Builder {
/* The required parameters */
- private final UserCodeWrapper<ReduceFunction> udf;
+ private final UserCodeWrapper<ReduceFunction> originalUdf;
+ private final UserCodeWrapper<GroupReduceFunction<Record, Record>> udfWrapper;
private final List<Class<? extends Key<?>>> keyClasses;
private final List<Integer> keyColumns;
@@ -191,10 +220,11 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
/**
* Creates a Builder with the provided {@link ReduceFunction} implementation.
*
- * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
+ * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
*/
- private Builder(UserCodeWrapper<ReduceFunction> udf) {
- this.udf = udf;
+ private Builder(UserCodeWrapper<ReduceFunction> originalUdf, UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf) {
+ this.originalUdf = originalUdf;
+ this.udfWrapper = wrappedUdf;
this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
this.keyColumns = new ArrayList<Integer>();
this.inputs = new ArrayList<Operator<Record>>();
@@ -204,12 +234,16 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
/**
* Creates a Builder with the provided {@link ReduceFunction} implementation.
*
- * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
+ * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
* @param keyClass The class of the key data type.
* @param keyColumn The position of the key.
*/
- private Builder(UserCodeWrapper<ReduceFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn) {
- this.udf = udf;
+ private Builder(UserCodeWrapper<ReduceFunction> originalUdf,
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf,
+ Class<? extends Key<?>> keyClass, int keyColumn)
+ {
+ this.originalUdf = originalUdf;
+ this.udfWrapper = wrappedUdf;
this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
this.keyClasses.add(keyClass);
this.keyColumns = new ArrayList<Integer>();
@@ -325,9 +359,48 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Redu
*/
public ReduceOperator build() {
if (name == null) {
- name = udf.getUserCodeClass().getName();
+ name = udfWrapper.getUserCodeClass().getName();
}
return new ReduceOperator(this);
}
}
+
+ // ============================================================================================
+
+ public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ public WrappingReduceFunction(ReduceFunction reducer) {
+ super(reducer);
+ }
+
+ @Override
+ public final void reduce(Iterable<Record> records, Collector<Record> out) throws Exception {
+ this.wrappedFunction.reduce(records.iterator(), out);
+ }
+
+ @Override
+ public final void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
+ this.wrappedFunction.combine(records.iterator(), out);
+ }
+ }
+
+ public static final class WrappingClassReduceFunction extends WrappingReduceFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public WrappingClassReduceFunction(Class<? extends ReduceFunction> reducer) {
+ super(InstantiationUtil.instantiate(reducer));
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeObject(wrappedFunction.getClass());
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ Class<?> clazz = (Class<?>) in.readObject();
+ this.wrappedFunction = (ReduceFunction) InstantiationUtil.instantiate(clazz);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index be872e5..9fcf963 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -246,7 +246,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
- public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double, Long, String>> second,
+ public void coGroup(Iterable<Tuple2<Double, String>> first, Iterable<Tuple3<Double, Long, String>> second,
Collector<Tuple3<Double, Long, String>> out) {
}
}
@@ -254,7 +254,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
@Override
- public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double, String>> first,
+ public void coGroup(Iterable<Tuple3<Double, Long, String>> second, Iterable<Tuple2<Double, String>> first,
Collector<Tuple3<Double, Long, String>> out) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
new file mode 100644
index 0000000..78a7ad8
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoGroupWrappingFunctionTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWrappedCoGroupObject() {
+ try {
+ AtomicInteger methodCounter = new AtomicInteger();
+
+ CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(methodCounter), LongValue.class, 1, 2).build();
+
+ RichFunction cogrouper = (RichFunction) coGroupOp.getUserCodeWrapper().getUserCodeObject();
+
+ // test the method invocations
+ cogrouper.close();
+ cogrouper.open(new Configuration());
+ assertEquals(2, methodCounter.get());
+
+ // prepare the coGroup
+ final List<Record> target = new ArrayList<Record>();
+ Collector<Record> collector = new Collector<Record>() {
+ @Override
+ public void collect(Record record) {
+ target.add(record);
+ }
+ @Override
+ public void close() {}
+ };
+
+ List<Record> source1 = new ArrayList<Record>();
+ source1.add(new Record(new IntValue(42)));
+ source1.add(new Record(new IntValue(13)));
+
+ List<Record> source2 = new ArrayList<Record>();
+ source2.add(new Record(new LongValue(11)));
+ source2.add(new Record(new LongValue(17)));
+
+ // test coGroup
+ ((org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>) cogrouper).coGroup(source1, source2, collector);
+ assertEquals(4, target.size());
+ assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+ assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+ assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
+ assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
+ target.clear();
+
+ // test the serialization
+ SerializationUtils.clone((java.io.Serializable) cogrouper);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWrappedCoGroupClass() {
+ try {
+ CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build();
+
+ UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf = coGroupOp.getUserCodeWrapper();
+ UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> copy = SerializationUtils.clone(udf);
+ org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> cogrouper = copy.getUserCodeObject();
+
+ // prepare the coGpuü
+ final List<Record> target = new ArrayList<Record>();
+ Collector<Record> collector = new Collector<Record>() {
+ @Override
+ public void collect(Record record) {
+ target.add(record);
+ }
+ @Override
+ public void close() {}
+ };
+
+ List<Record> source1 = new ArrayList<Record>();
+ source1.add(new Record(new IntValue(42)));
+ source1.add(new Record(new IntValue(13)));
+
+ List<Record> source2 = new ArrayList<Record>();
+ source2.add(new Record(new LongValue(11)));
+ source2.add(new Record(new LongValue(17)));
+
+ // test coGroup
+ cogrouper.coGroup(source1, source2, collector);
+ assertEquals(4, target.size());
+ assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+ assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+ assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
+ assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
+ target.clear();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testExtractSemantics() {
+ try {
+ {
+ CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(), LongValue.class, 1, 2).build();
+
+ DualInputSemanticProperties props = coGroupOp.getSemanticProperties();
+ FieldSet fw2 = props.getForwardedField1(2);
+ FieldSet fw4 = props.getForwardedField2(4);
+
+ assertNotNull(fw2);
+ assertNotNull(fw4);
+ assertEquals(1, fw2.size());
+ assertEquals(1, fw4.size());
+ assertTrue(fw2.contains(2));
+ assertTrue(fw4.contains(4));
+ }
+ {
+ CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build();
+
+ DualInputSemanticProperties props = coGroupOp.getSemanticProperties();
+ FieldSet fw2 = props.getForwardedField1(2);
+ FieldSet fw4 = props.getForwardedField2(4);
+
+ assertNotNull(fw2);
+ assertNotNull(fw4);
+ assertEquals(1, fw2.size());
+ assertEquals(1, fw4.size());
+ assertTrue(fw2.contains(2));
+ assertTrue(fw4.contains(4));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @ConstantFieldsFirst(2)
+ @ConstantFieldsSecond(4)
+ public static class TestCoGroupFunction extends CoGroupFunction {
+
+ private final AtomicInteger methodCounter;
+
+ private TestCoGroupFunction(AtomicInteger methodCounter) {
+ this.methodCounter= methodCounter;
+ }
+
+ public TestCoGroupFunction() {
+ methodCounter = new AtomicInteger();
+ }
+
+ @Override
+ public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception {
+ while (records1.hasNext()) {
+ out.collect(records1.next());
+ }
+ while (records2.hasNext()) {
+ out.collect(records2.next());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ methodCounter.incrementAndGet();
+ super.close();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ methodCounter.incrementAndGet();
+ super.open(parameters);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
new file mode 100644
index 0000000..653bb16
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class ReduceWrappingFunctionTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWrappedReduceObject() {
+ try {
+ AtomicInteger methodCounter = new AtomicInteger();
+
+ ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction(methodCounter)).build();
+
+ RichFunction reducer = (RichFunction) reduceOp.getUserCodeWrapper().getUserCodeObject();
+
+ // test the method invocations
+ reducer.close();
+ reducer.open(new Configuration());
+ assertEquals(2, methodCounter.get());
+
+ // prepare the reduce / combine tests
+ final List<Record> target = new ArrayList<Record>();
+ Collector<Record> collector = new Collector<Record>() {
+ @Override
+ public void collect(Record record) {
+ target.add(record);
+ }
+ @Override
+ public void close() {}
+ };
+
+ List<Record> source = new ArrayList<Record>();
+ source.add(new Record(new IntValue(42), new LongValue(11)));
+ source.add(new Record(new IntValue(13), new LongValue(17)));
+
+ // test reduce
+ ((GroupReduceFunction<Record, Record>) reducer).reduce(source, collector);
+ assertEquals(2, target.size());
+ assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+ assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+ assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+ assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+ target.clear();
+
+ // test combine
+ ((FlatCombineFunction<Record>) reducer).combine(source, collector);
+ assertEquals(2, target.size());
+ assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+ assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+ assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+ assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+ target.clear();
+
+ // test the serialization
+ SerializationUtils.clone((java.io.Serializable) reducer);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWrappedReduceClass() {
+ try {
+ ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
+
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> udf = reduceOp.getUserCodeWrapper();
+ UserCodeWrapper<GroupReduceFunction<Record, Record>> copy = SerializationUtils.clone(udf);
+ GroupReduceFunction<Record, Record> reducer = copy.getUserCodeObject();
+
+ // prepare the reduce / combine tests
+ final List<Record> target = new ArrayList<Record>();
+ Collector<Record> collector = new Collector<Record>() {
+ @Override
+ public void collect(Record record) {
+ target.add(record);
+ }
+ @Override
+ public void close() {}
+ };
+
+ List<Record> source = new ArrayList<Record>();
+ source.add(new Record(new IntValue(42), new LongValue(11)));
+ source.add(new Record(new IntValue(13), new LongValue(17)));
+
+ // test reduce
+ reducer.reduce(source, collector);
+ assertEquals(2, target.size());
+ assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+ assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+ assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+ assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+ target.clear();
+
+ // test combine
+ ((FlatCombineFunction<Record>) reducer).combine(source, collector);
+ assertEquals(2, target.size());
+ assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
+ assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
+ assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
+ assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
+ target.clear();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testExtractSemantics() {
+ try {
+ {
+ ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
+
+ SingleInputSemanticProperties props = reduceOp.getSemanticProperties();
+ FieldSet fw2 = props.getForwardedField(2);
+ FieldSet fw4 = props.getForwardedField(4);
+
+ assertNotNull(fw2);
+ assertNotNull(fw4);
+ assertEquals(1, fw2.size());
+ assertEquals(1, fw4.size());
+ assertTrue(fw2.contains(2));
+ assertTrue(fw4.contains(4));
+ }
+ {
+ ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
+
+ SingleInputSemanticProperties props = reduceOp.getSemanticProperties();
+ FieldSet fw2 = props.getForwardedField(2);
+ FieldSet fw4 = props.getForwardedField(4);
+
+ assertNotNull(fw2);
+ assertNotNull(fw4);
+ assertEquals(1, fw2.size());
+ assertEquals(1, fw4.size());
+ assertTrue(fw2.contains(2));
+ assertTrue(fw4.contains(4));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCombinable() {
+ try {
+ {
+ ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
+ assertTrue(reduceOp.isCombinable());
+ }
+ {
+ ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
+ assertTrue(reduceOp.isCombinable());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Combinable
+ @ConstantFields({2, 4})
+ public static class TestReduceFunction extends ReduceFunction {
+
+ private final AtomicInteger methodCounter;
+
+ private TestReduceFunction(AtomicInteger methodCounter) {
+ this.methodCounter= methodCounter;
+ }
+
+ public TestReduceFunction() {
+ methodCounter = new AtomicInteger();
+ }
+
+ @Override
+ public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+ while (records.hasNext()) {
+ out.collect(records.next());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ methodCounter.incrementAndGet();
+ super.close();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ methodCounter.incrementAndGet();
+ super.open(parameters);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index d5044a8..c8a8ee9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.type.extractor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -72,7 +71,7 @@ public class TypeExtractorTest {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Boolean> values, Collector<Boolean> out) throws Exception {
+ public void reduce(Iterable<Boolean> values, Collector<Boolean> out) throws Exception {
// nothing to do
}
};
@@ -1084,7 +1083,7 @@ public class TypeExtractorTest {
private static final long serialVersionUID = 1L;
@Override
- public void coGroup(Iterator<String[]> first, Iterator<String[]> second, Collector<String[]> out) throws Exception {
+ public void coGroup(Iterable<String[]> first, Iterable<String[]> second, Collector<String[]> out) throws Exception {
// nothing to do
}
};
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
index 7dd4dea..ab18bf8 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -48,14 +48,11 @@ public class CoGroupITCase implements Serializable {
DataSet<Tuple2<Integer,String>> joined = left.coGroup(right).where(0).equalTo(0)
.with((values1, values2, out) -> {
int sum = 0;
- String conc = "";
- while (values1.hasNext()) {
- sum += values1.next().f0;
- conc += values1.next().f1;
+ for (Tuple2<Integer, String> next : values1) {
+ sum += next.f0;
}
- while (values2.hasNext()) {
- sum += values2.next().f0;
- conc += values2.next().f1;
+ for (Tuple2<Integer, String> next : values2) {
+ sum += next.f0;
}
});
env.execute();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
index a86de1f..7b5d659 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -38,8 +38,7 @@ public class GroupReduceITCase implements Serializable {
DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
String conc = "";
- while (values.hasNext()) {
- String s = values.next();
+ for (String s : values) {
conc = conc.concat(s);
}
out.collect(conc);
@@ -68,9 +67,8 @@ public class GroupReduceITCase implements Serializable {
.groupBy(0)
.reduceGroup((values, out) -> {
String conc = "";
- while (values.hasNext()) {
- String s = values.next().f1;
- conc = conc.concat(s);
+ for (Tuple2<Integer,String> next : values) {
+ conc = conc.concat(next.f1);
}
out.collect(conc);
});
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 8761a2e..8590b78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -16,10 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
-import java.util.Iterator;
+import java.util.Collections;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -29,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.util.EmptyIterator;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.runtime.util.SingleElementIterator;
import org.apache.flink.util.Collector;
@@ -132,7 +130,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
final KeyGroupedIterator<IT2> probeSideInput = new KeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator);
final SingleElementIterator<IT1> siIter = new SingleElementIterator<IT1>();
- final Iterator<IT1> emptySolutionSide = EmptyIterator.<IT1>get();
+ final Iterable<IT1> emptySolutionSide = Collections.emptySet();
final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this.probeSideComparator, this.pairComparator);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index f2020c7..b3c0ece 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -132,7 +129,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
final KeyGroupedIterator<IT1> probeSideInput = new KeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator);
final SingleElementIterator<IT2> siIter = new SingleElementIterator<IT2>();
- final Iterator<IT2> emptySolutionSide = EmptyIterator.<IT2>get();
+ final Iterable<IT2> emptySolutionSide = EmptyIterator.<IT2>get();
final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c8f217c..9bc1893 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -697,7 +697,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
// check if the class is a subclass, if the check is required
if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
- Thread.dumpStack();
throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
stubSuperClass.getName() + "' as is required.");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index da8a11b..6a6f6ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
/**
@@ -503,7 +504,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
* This class implements an iterator over values from a sort buffer. The iterator returns the values of a given
* interval.
*/
- private static final class CombineValueIterator<E> implements Iterator<E> {
+ private static final class CombineValueIterator<E> implements Iterator<E>, Iterable<E> {
private final InMemorySorter<E> buffer; // the buffer from which values are returned
@@ -512,6 +513,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
private int last; // the position of the last value to be returned
private int position; // the position of the next value to be returned
+
+ private boolean iteratorAvailable;
/**
* Creates an iterator over the values in a <tt>BufferSortable</tt>.
@@ -535,6 +538,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
public void set(int first, int last) {
this.last = last;
this.position = first;
+ this.iteratorAvailable = true;
}
@Override
@@ -564,6 +568,16 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<E> iterator() {
+ if (iteratorAvailable) {
+ iteratorAvailable = false;
+ return this;
+ } else {
+ throw new TraversableOnceException();
+ }
+ }
};
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index f575076..ee2a2c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -410,7 +410,7 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
*
* @param output The output view to write the records to.
* @param start The logical start position of the subset.
- * @param len The number of elements to write.
+ * @param num The number of elements to write.
* @throws IOException Thrown, if an I/O exception occurred writing to the output view.
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index a018def..6f75490 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -103,7 +103,7 @@ public interface InMemorySorter<T> extends IndexedSortable {
*
* @param output The output view to write the records to.
* @param start The logical start position of the subset.
- * @param len The number of elements to write.
+ * @param num The number of elements to write.
* @throws IOException Thrown, if an I/O exception occurred writing to the output view.
*/
public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 9ef2ad7..6881cdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -34,24 +34,19 @@ import org.apache.flink.util.MutableObjectIterator;
* to the next smallest element logarithmic in complexity, with respect to the
* number of streams to be merged.
* The order among the elements is established using the methods from the
- * {@link TypeSerializer} class, specifically {@link TypeSerializer#setReference(Object)}
- * and {@link TypeSerializer#compareToReference(TypeSerializer)}.
- *
- * @see TypeSerializer
- * @see TypeSerializer#setReference(Object)
- * @see TypeSerializer#compareToReference(TypeSerializer)
- *
+ * {@link TypeComparator} class, specifically {@link TypeComparator#setReference(Object)}
+ * and {@link TypeComparator#compareToReference(TypeComparator)}.
*/
-public class MergeIterator<E> implements MutableObjectIterator<E>
-{
+public class MergeIterator<E> implements MutableObjectIterator<E> {
+
private final PartialOrderPriorityQueue<HeadStream<E>> heap; // heap over the head elements of the stream
private final TypeSerializer<E> serializer;
/**
* @param iterators
- * @param accessors The accessors used to establish an order among the elements.
- * The accessors will not be used directly, but a duplicate will be used.
+ * @param serializer
+ * @param comparator
* @throws IOException
*/
public MergeIterator(List<MutableObjectIterator<E>> iterators,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
index 2ed75ae..48693a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
index dcd3361..f6803fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIterator.java
@@ -16,17 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
-import java.util.Iterator;
+import java.util.Collections;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
-import org.apache.flink.runtime.util.EmptyIterator;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.MutableObjectIterator;
@@ -41,9 +39,9 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
private MatchStatus matchStatus;
- private Iterator<T1> firstReturn;
+ private Iterable<T1> firstReturn;
- private Iterator<T2> secondReturn;
+ private Iterable<T2> secondReturn;
private TypePairComparator<T1, T2> comp;
@@ -73,13 +71,13 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
@Override
- public Iterator<T1> getValues1() {
+ public Iterable<T1> getValues1() {
return this.firstReturn;
}
@Override
- public Iterator<T2> getValues2() {
+ public Iterable<T2> getValues2() {
return this.secondReturn;
}
@@ -117,7 +115,7 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
}
else if (firstEmpty && !secondEmpty) {
// input1 is empty, input2 not
- this.firstReturn = EmptyIterator.get();
+ this.firstReturn = Collections.emptySet();
this.secondReturn = this.iterator2.getValues();
this.matchStatus = MatchStatus.FIRST_EMPTY;
return true;
@@ -125,7 +123,7 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
else if (!firstEmpty && secondEmpty) {
// input1 is not empty, input 2 is empty
this.firstReturn = this.iterator1.getValues();
- this.secondReturn = EmptyIterator.get();
+ this.secondReturn = Collections.emptySet();
this.matchStatus = MatchStatus.SECOND_EMPTY;
return true;
}
@@ -142,12 +140,12 @@ public class SortMergeCoGroupIterator<T1, T2> implements CoGroupTaskIterator<T1,
else if (0 < comp) {
// key1 goes first
this.firstReturn = this.iterator1.getValues();
- this.secondReturn = EmptyIterator.get();
+ this.secondReturn = Collections.emptySet();
this.matchStatus = MatchStatus.SECOND_REMAINED;
}
else {
// key 2 goes first
- this.firstReturn = EmptyIterator.get();
+ this.firstReturn = Collections.emptySet();
this.secondReturn = this.iterator2.getValues();
this.matchStatus = MatchStatus.FIRST_REMAINED;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
index a20490f..e137e27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/CoGroupTaskIterator.java
@@ -20,7 +20,6 @@
package org.apache.flink.runtime.operators.util;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -30,8 +29,8 @@ import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
* @param <T1> The generic type of the first input's data type.
* @param <T2> The generic type of the second input's data type.
*/
-public interface CoGroupTaskIterator<T1, T2>
-{
+public interface CoGroupTaskIterator<T1, T2> {
+
/**
* General-purpose open method.
*
@@ -63,12 +62,12 @@ public interface CoGroupTaskIterator<T1, T2>
*
* @return an iterable over the left input values for the current key.
*/
- Iterator<T1> getValues1();
+ Iterable<T1> getValues1();
/**
* Returns an iterable over the left input values for the current key.
*
* @return an iterable over the left input values for the current key.
*/
- Iterator<T2> getValues2();
+ Iterable<T2> getValues2();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
index b1628d7..caafaea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyIterator.java
@@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
/**
* An empty iterator that never returns anything.
*/
-public final class EmptyIterator<E> implements Iterator<E> {
+public final class EmptyIterator<E> implements Iterator<E>, Iterable<E> {
/**
* The singleton instance.
@@ -38,9 +38,9 @@ public final class EmptyIterator<E> implements Iterator<E> {
* @param <E> The type of the objects (not) returned by the iterator.
* @return An instance of the iterator.
*/
- public static <E> Iterator<E> get() {
+ public static <E> EmptyIterator<E> get() {
@SuppressWarnings("unchecked")
- Iterator<E> iter = (Iterator<E>) INSTANCE;
+ EmptyIterator<E> iter = (EmptyIterator<E>) INSTANCE;
return iter;
}
@@ -73,4 +73,9 @@ public final class EmptyIterator<E> implements Iterator<E> {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<E> iterator() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
index 0873baf..be43cc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
@@ -26,6 +26,7 @@ import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
/**
* The KeyValueIterator returns a key and all values that belong to the key (share the same key).
@@ -98,6 +99,8 @@ public final class KeyGroupedIterator<E> {
return false;
}
}
+
+ this.valuesIterator.iteratorAvailable = true;
// Whole value-iterator was read and a new key is available.
if (this.lookAheadHasNext) {
@@ -151,19 +154,20 @@ public final class KeyGroupedIterator<E> {
// --------------------------------------------------------------------------------------------
- public final class ValuesIterator implements Iterator<E>
- {
+ public final class ValuesIterator implements Iterator<E>, Iterable<E> {
+
private final TypeSerializer<E> serializer = KeyGroupedIterator.this.serializer;
private final TypeComparator<E> comparator = KeyGroupedIterator.this.comparator;
private E staging = this.serializer.createInstance();
private boolean currentIsUnconsumed = false;
+ private boolean iteratorAvailable = true;
+
private ValuesIterator() {}
@Override
- public boolean hasNext()
- {
+ public boolean hasNext() {
if (KeyGroupedIterator.this.current == null || KeyGroupedIterator.this.lookAheadHasNext) {
return false;
}
@@ -221,5 +225,16 @@ public final class KeyGroupedIterator<E> {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<E> iterator() {
+ if (iteratorAvailable) {
+ iteratorAvailable = false;
+ return this;
+ }
+ else {
+ throw new TraversableOnceException();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
index e707a50..14e4ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MutableToRegularIteratorWrapper.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
/**
* This class wraps a {@link MutableObjectIterator} into a regular {@link Iterator}.
@@ -32,13 +33,15 @@ import org.apache.flink.util.MutableObjectIterator;
* whenever hasNext() returns (possibly with false), the previous obtained record is
* still valid and cannot have been overwritten internally.
*/
-public class MutableToRegularIteratorWrapper<T> implements Iterator<T> {
+public class MutableToRegularIteratorWrapper<T> implements Iterator<T>, Iterable<T> {
private final MutableObjectIterator<T> source;
private T current, next;
private boolean currentIsAvailable;
+
+ private boolean iteratorAvailable = true;
public MutableToRegularIteratorWrapper(MutableObjectIterator<T> source, TypeSerializer<T> serializer) {
this.source = source;
@@ -85,4 +88,15 @@ public class MutableToRegularIteratorWrapper<T> implements Iterator<T> {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<T> iterator() {
+ if (iteratorAvailable) {
+ iteratorAvailable = false;
+ return this;
+ }
+ else {
+ throw new TraversableOnceException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
index 222c5b8..f7eb41a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SingleElementIterator.java
@@ -27,7 +27,7 @@ import java.util.NoSuchElementException;
*
* @param<E> The generic type of the iterator.
*/
-public final class SingleElementIterator<E> implements Iterator<E> {
+public final class SingleElementIterator<E> implements Iterator<E>, Iterable<E> {
private E current;
private boolean available = false;
@@ -61,4 +61,9 @@ public final class SingleElementIterator<E> implements Iterator<E> {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<E> iterator() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index 3894233..3c8bbd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
@@ -24,9 +23,12 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.BuildFirstCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -40,8 +42,8 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
-{
+public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+
private static final long HASH_MEM = 6*1024*1024;
private static final long SORT_MEM = 3*1024*1024;
@@ -459,7 +461,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
// =================================================================================================
- public static final class MockMatchStub extends JoinFunction {
+ public static final class MockMatchStub extends RichFlatJoinFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
@Override
@@ -468,13 +470,13 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
}
}
- public static final class MockFailingMatchStub extends JoinFunction {
+ public static final class MockFailingMatchStub extends RichFlatJoinFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
- public void join(Record record1, Record record2, Collector<Record> out) {
+ public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
if (++this.cnt >= 10) {
throw new ExpectedTestException();
}
@@ -483,7 +485,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
}
}
- public static final class MockDelayingMatchStub extends JoinFunction {
+ public static final class MockDelayingMatchStub extends RichFlatJoinFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 5551485..b65f161 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
-import java.util.Iterator;
-
import org.junit.Assert;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
@@ -84,25 +83,23 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Re
Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
}
- public static final class MockCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
+ public static final class MockCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
private final Record res = new Record();
+ @SuppressWarnings("unused")
@Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
- {
+ public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
int val1Cnt = 0;
int val2Cnt = 0;
- while (records1.hasNext()) {
+ for (Record r : records1) {
val1Cnt++;
- records1.next();
}
- while (records2.hasNext()) {
+ for (Record r : records2) {
val2Cnt++;
- records2.next();
}
if (val1Cnt == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index e57d20c..968d947 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
-import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase.MockCoGroupStub;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -400,24 +400,20 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
}
- public static class MockFailingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
+ public static class MockFailingCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
- public void coGroup(Iterator<Record> records1,
- Iterator<Record> records2, Collector<Record> out) throws RuntimeException
- {
+ public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
int val1Cnt = 0;
- while (records1.hasNext()) {
+ for (@SuppressWarnings("unused") Record r : records1) {
val1Cnt++;
- records1.next();
}
- while (records2.hasNext()) {
- Record record2 = records2.next();
+ for (Record record2 : records2) {
if (val1Cnt == 0) {
if(++this.cnt>=10) {
@@ -440,25 +436,23 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
}
- public static final class MockDelayingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
+ public static final class MockDelayingCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("unused")
@Override
- public void coGroup(Iterator<Record> records1,
- Iterator<Record> records2, Collector<Record> out) {
+ public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
- while (records1.hasNext()) {
+ for (Record r : records1) {
try {
Thread.sleep(100);
} catch (InterruptedException e) { }
- records1.next();
}
- while (records2.hasNext()) {
+ for (Record r : records2) {
try {
Thread.sleep(100);
} catch (InterruptedException e) { }
- records2.next();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 99440aa..7915d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -16,19 +16,18 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -163,18 +162,18 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
}
@Combinable
- public static class MockCombiningReduceStub extends ReduceFunction {
+ public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private final IntValue theInteger = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.theInteger);
sum += this.theInteger.getValue();
@@ -185,13 +184,13 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
}
@Override
- public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
+ public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
reduce(records, out);
}
}
@Combinable
- public static final class MockFailingCombiningReduceStub extends ReduceFunction {
+ public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@@ -201,12 +200,12 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
private final IntValue combineValue = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.value);
sum += this.value.getValue();
@@ -218,12 +217,12 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
}
@Override
- public void combine(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void combine(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.combineValue);
sum += this.combineValue.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
index b7e48c6..786fb4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, Record>> {
private static final Log LOG = LogFactory.getLog(MapTaskTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index ebee0b8..8b8e991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.junit.Assert;
@@ -28,10 +26,11 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -213,19 +212,19 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
}
- public static class MockReduceStub extends ReduceFunction {
+ public static class MockReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
private final IntValue value = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int cnt = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
cnt++;
}
element.getField(0, this.key);
@@ -236,7 +235,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
}
@Combinable
- public static class MockCombiningReduceStub extends ReduceFunction {
+ public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
@@ -244,12 +243,12 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
private final IntValue combineValue = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.value);
sum += this.value.getValue();
@@ -261,12 +260,12 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
}
@Override
- public void combine(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void combine(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.combineValue);
sum += this.combineValue.getValue();
@@ -276,7 +275,5 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
element.setField(1, this.combineValue);
out.collect(element);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index b367a7e..fada5a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,10 +27,11 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -270,19 +269,19 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
}
- public static class MockReduceStub extends ReduceFunction {
+ public static class MockReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
private final IntValue value = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int cnt = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
cnt++;
}
element.getField(0, this.key);
@@ -293,7 +292,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
}
@Combinable
- public static class MockCombiningReduceStub extends ReduceFunction {
+ public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
@@ -301,12 +300,12 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
private final IntValue combineValue = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.value);
sum += this.value.getValue();
@@ -318,12 +317,12 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
}
@Override
- public void combine(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void combine(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
element.getField(1, this.combineValue);
sum += this.combineValue.getValue();
@@ -336,7 +335,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
}
- public static class MockFailingReduceStub extends ReduceFunction {
+ public static class MockFailingReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@@ -345,12 +344,12 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
private final IntValue value = new IntValue();
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out)
- throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int valCnt = 0;
- while (records.hasNext()) {
- element = records.next();
+
+ for (Record next : records) {
+ element = next;
valCnt++;
}
@@ -365,16 +364,15 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
}
}
- public static class MockDelayingReduceStub extends ReduceFunction {
+ public static class MockDelayingReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out) {
- while(records.hasNext()) {
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
+ for (@SuppressWarnings("unused") Record r : records) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
- records.next();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index aabaa03..5609128 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -16,16 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.chaining;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GenericCollectorMap;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
@@ -48,6 +46,7 @@ import org.junit.Assert;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class ChainTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
@@ -68,7 +67,6 @@ public class ChainTaskTest extends TaskTestBase {
}
-
@Test
public void testMapTask() {
final int keyCnt = 100;
@@ -190,18 +188,19 @@ public class ChainTaskTest extends TaskTestBase {
}
}
- public static final class MockFailingCombineStub extends ReduceFunction {
+ public static final class MockFailingCombineStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+ public void reduce(Iterable<Record> records, Collector<Record> out) throws Exception {
if (++this.cnt >= 5) {
throw new RuntimeException("Expected Test Exception");
}
- while(records.hasNext()) {
- out.collect(records.next());
+
+ for (Record r : records) {
+ out.collect(r);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
index d603dec..427dc74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.drivers;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -155,11 +153,10 @@ public class AllGroupReduceDriverTest {
public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
- public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
- Tuple2<String, Integer> current = values.next();
+ public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
+ Tuple2<String, Integer> current = new Tuple2<String, Integer>("", 0);
- while (values.hasNext()) {
- Tuple2<String, Integer> next = values.next();
+ for (Tuple2<String, Integer> next : values) {
next.f0 = current.f0 + next.f0;
next.f1 = current.f1 + next.f1;
current = next;
@@ -172,11 +169,10 @@ public class AllGroupReduceDriverTest {
public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
@Override
- public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
- Tuple2<StringValue, IntValue> current = values.next();
+ public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
+ Tuple2<StringValue, IntValue> current = new Tuple2<StringValue, IntValue>(new StringValue(""), new IntValue(0));
- while (values.hasNext()) {
- Tuple2<StringValue, IntValue> next = values.next();
+ for (Tuple2<StringValue, IntValue> next : values) {
next.f0.append(current.f0);
next.f1.setValue(current.f1.getValue() + next.f1.getValue());
current = next;