You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:37 UTC
[21/22] git commit: [FLINK-1023] Switch group-at-a-time function to
java.lang.Iterable (from java.util.Iterator) Iterables over transient data
throw an TraversableOnceException when iterated over again.
[FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from java.util.Iterator)
Iterables over transient data throw an TraversableOnceException when iterated over again.
This closes #84
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/72d7b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/72d7b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/72d7b862
Branch: refs/heads/release-0.6
Commit: 72d7b86274c33d1570ffb22b1fca2081c15d753c
Parents: 934e4e0
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 29 15:58:44 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 1 02:33:49 2014 +0200
----------------------------------------------------------------------
.../api/avro/AvroWithEmptyArrayITCase.java | 7 +-
.../mapred/record/example/WordCount.java | 2 +-
.../example/WordCountWithOutputFormat.java | 2 +-
.../spargel/java/VertexCentricIteration.java | 42 ++--
.../spargel/java/record/SpargelIteration.java | 2 +-
.../compiler/CoGroupSolutionSetFirstTest.java | 7 +-
.../compiler/GroupReduceCompilationTest.java | 15 +-
.../flink/compiler/IterationsCompilerTest.java | 5 +-
.../WorksetIterationsJavaApiCompilerTest.java | 10 +-
.../testfunctions/IdentityGroupReducer.java | 8 +-
.../testfunctions/Top1GroupReducer.java | 7 +-
.../flink/compiler/util/DummyCoGroupStub.java | 1 -
.../flink/compiler/util/IdentityReduce.java | 1 -
.../api/common/functions/CoGroupFunction.java | 9 +-
.../api/common/functions/CombineFunction.java | 3 +-
.../common/functions/FlatCombineFunction.java | 3 +-
.../common/functions/GroupReduceFunction.java | 3 +-
.../flink/util/TraversableOnceException.java | 28 +++
.../example/java/graph/EnumTrianglesBasic.java | 10 +-
.../example/java/graph/EnumTrianglesOpt.java | 12 +-
.../flink/example/java/graph/PageRankBasic.java | 6 +-
.../java/graph/TransitiveClosureNaive.java | 6 +-
.../example/java/relational/WebLogAnalysis.java | 10 +-
.../flink/api/java/ExecutionEnvironment.java | 21 ++
.../api/java/functions/GroupReduceIterator.java | 4 +-
.../api/java/functions/RichCoGroupFunction.java | 4 +-
.../java/functions/RichFlatCombineFunction.java | 4 +-
.../java/functions/RichGroupReduceFunction.java | 9 +-
.../api/java/operators/AggregateOperator.java | 3 +-
.../api/java/operators/CoGroupOperator.java | 10 +-
.../api/java/operators/DistinctOperator.java | 6 +-
.../api/java/operators/GroupReduceOperator.java | 5 -
.../PlanUnwrappingCoGroupOperator.java | 44 +---
.../translation/PlanUnwrappingJoinOperator.java | 8 -
.../PlanUnwrappingReduceGroupOperator.java | 14 +-
.../translation/TupleUnwrappingIterator.java | 15 +-
.../operators/translation/WrappingFunction.java | 3 +-
.../java/record/functions/CoGroupFunction.java | 4 +-
.../java/record/functions/ReduceFunction.java | 7 +-
.../java/record/operators/CoGroupOperator.java | 61 ++++-
.../java/record/operators/ReduceOperator.java | 107 ++++++--
.../DeltaIterationTranslationTest.java | 4 +-
.../record/CoGroupWrappingFunctionTest.java | 221 +++++++++++++++++
.../java/record/ReduceWrappingFunctionTest.java | 246 +++++++++++++++++++
.../java/type/extractor/TypeExtractorTest.java | 5 +-
.../javaApiOperators/lambdas/CoGroupITCase.java | 11 +-
.../lambdas/GroupReduceITCase.java | 8 +-
.../CoGroupWithSolutionSetFirstDriver.java | 6 +-
.../CoGroupWithSolutionSetSecondDriver.java | 5 +-
.../runtime/operators/RegularPactTask.java | 1 -
.../sort/CombiningUnilateralSortMerger.java | 16 +-
.../operators/sort/FixedLengthRecordSorter.java | 2 +-
.../runtime/operators/sort/InMemorySorter.java | 2 +-
.../runtime/operators/sort/MergeIterator.java | 17 +-
.../operators/sort/MergeMatchIterator.java | 1 -
.../sort/SortMergeCoGroupIterator.java | 20 +-
.../operators/util/CoGroupTaskIterator.java | 9 +-
.../flink/runtime/util/EmptyIterator.java | 11 +-
.../flink/runtime/util/KeyGroupedIterator.java | 23 +-
.../util/MutableToRegularIteratorWrapper.java | 16 +-
.../runtime/util/SingleElementIterator.java | 7 +-
.../runtime/operators/CachedMatchTaskTest.java | 18 +-
.../operators/CoGroupTaskExternalITCase.java | 19 +-
.../runtime/operators/CoGroupTaskTest.java | 30 +--
.../runtime/operators/CombineTaskTest.java | 35 ++-
.../flink/runtime/operators/MapTaskTest.java | 1 +
.../operators/ReduceTaskExternalITCase.java | 35 ++-
.../flink/runtime/operators/ReduceTaskTest.java | 50 ++--
.../operators/chaining/ChainTaskTest.java | 15 +-
.../drivers/AllGroupReduceDriverTest.java | 16 +-
.../drivers/GroupReduceDriverTest.java | 16 +-
.../CombiningUnilateralSortMergerITCase.java | 24 +-
.../sort/SortMergeCoGroupIteratorITCase.java | 4 +-
.../runtime/util/KeyGroupedIteratorTest.java | 43 +++-
.../api/scala/functions/ReduceFunction.scala | 4 +-
.../api/scala/operators/CoGroupOperator.scala | 7 +-
.../api/scala/operators/ReduceOperator.scala | 5 +-
.../test/accumulators/AccumulatorITCase.java | 1 +
.../AccumulatorIterativeITCase.java | 8 +-
.../KMeansIterativeNepheleITCase.java | 3 +-
.../BulkIterationWithAllReducerITCase.java | 8 +-
.../CoGroupConnectedComponentsITCase.java | 4 +-
.../CoGroupConnectedComponentsSecondITCase.java | 12 +-
.../DependencyConnectedComponentsITCase.java | 30 +--
...IterationTerminationWithTerminationTail.java | 1 -
.../IterationTerminationWithTwoTails.java | 2 +-
.../IterationWithAllReducerITCase.java | 1 -
.../iterative/IterationWithChainingITCase.java | 118 +++++----
.../iterative/IterationWithUnionITCase.java | 81 ++----
...nentsWithParametrizableAggregatorITCase.java | 35 ++-
...entsWithParametrizableConvergenceITCase.java | 20 +-
.../ConnectedComponentsNepheleITCase.java | 5 +-
.../IterationWithChainingNepheleITCase.java | 11 +-
.../CustomCompensatableDotProductCoGroup.java | 11 +-
.../CustomCompensatingMap.java | 1 +
.../CustomRankCombiner.java | 6 +-
.../CompensatableDotProductCoGroup.java | 4 +-
.../test/javaApiOperators/CoGroupITCase.java | 94 +++----
.../javaApiOperators/GroupReduceITCase.java | 123 ++++------
.../flink/test/operators/CoGroupITCase.java | 29 +--
.../flink/test/operators/ReduceITCase.java | 23 +-
.../recordJobTests/GroupOrderReduceITCase.java | 1 -
.../graph/WorksetConnectedComponents.java | 2 +-
103 files changed, 1295 insertions(+), 820 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
index ea9edff..89db1fa 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
@@ -194,15 +194,14 @@ public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
private static final long serialVersionUID = 1L;
@Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
- throws Exception {
+ public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
Record r1 = null;
- if (records1.hasNext()) {
+ while (records1.hasNext()) {
r1 = records1.next();
}
Record r2 = null;
- if (records2.hasNext()) {
+ while (records2.hasNext()) {
r2 = records2.next();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
index 25caf0c..88b1892 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.hadoopcompatibility.mapred.record.example;
import java.io.Serializable;
@@ -103,6 +102,7 @@ public class WordCount implements Program, ProgramDescription {
public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
Record element = null;
int sum = 0;
+
while (records.hasNext()) {
element = records.next();
int cnt = element.getField(1, IntValue.class).getValue();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
index a3cd3d5..8aaf8e5 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.hadoopcompatibility.mapred.record.example;
import java.io.Serializable;
@@ -101,6 +100,7 @@ public class WordCountWithOutputFormat implements Program, ProgramDescription {
public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
Record element = null;
int sum = 0;
+
while (records.hasNext()) {
element = records.next();
int cnt = element.getField(1, IntValue.class).getValue();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 65be2f8..8f9149c 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -413,24 +413,28 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
}
@Override
- public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
+ public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
Collector<Tuple2<VertexKey, VertexValue>> out)
throws Exception
{
- if (vertex.hasNext()) {
- Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
+ final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator();
+
+ if (vertexIter.hasNext()) {
+ Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next();
@SuppressWarnings("unchecked")
- Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
+ Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
vertexUpdateFunction.setOutput(vertexState, out);
vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
- } else {
- if (messages.hasNext()) {
+ }
+ else {
+ final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+ if (messageIter.hasNext()) {
String message = "Target vertex does not exist!.";
try {
- Tuple2<VertexKey, Message> next = messages.next();
+ Tuple2<VertexKey, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
} catch (Throwable t) {}
throw new Exception(message);
@@ -481,13 +485,15 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
}
@Override
- public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges,
- Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+ public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges,
+ Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
throws Exception
{
- if (state.hasNext()) {
- Tuple2<VertexKey, VertexValue> newVertexState = state.next();
- messagingFunction.set((Iterator<?>) edges, out);
+ final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+ messagingFunction.set((Iterator<?>) edges.iterator(), out);
messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
}
}
@@ -534,13 +540,15 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
}
@Override
- public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
- Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+ public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
+ Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
throws Exception
{
- if (state.hasNext()) {
- Tuple2<VertexKey, VertexValue> newVertexState = state.next();
- messagingFunction.set((Iterator<?>) edges, out);
+ final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+ messagingFunction.set((Iterator<?>) edges.iterator(), out);
messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 3a58afc..96bc799 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -161,7 +161,7 @@ public class SpargelIteration {
@Override
public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
-
+
if (vertex.hasNext()) {
Record first = vertex.next();
first.getFieldInto(0, vertexKey);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 3624d86..2a4d6d8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler;
-import java.util.Iterator;
-
import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.junit.Assert;
@@ -30,6 +27,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -44,8 +42,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
@Override
- public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
- }
+ public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
}
public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index b9cc769..9f63683 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler;
-import java.util.Iterator;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
@@ -51,7 +48,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
- public void reduce(Iterator<Double> values, Collector<Double> out) {}
+ public void reduce(Iterable<Double> values, Collector<Double> out) {}
}).name("reducer")
.print().name("sink");
@@ -95,7 +92,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
- public void reduce(Iterator<Long> values, Collector<Long> out) {}
+ public void reduce(Iterable<Long> values, Collector<Long> out) {}
}).name("reducer");
reduced.setCombinable(true);
@@ -148,7 +145,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
data
.groupBy(1)
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
- public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
.print().name("sink");
@@ -197,7 +194,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(1)
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
- public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer");
reduced.setCombinable(true);
@@ -256,7 +253,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
- public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
.print().name("sink");
@@ -314,7 +311,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
- public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer");
reduced.setCombinable(true);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 8fc4324..828a635 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler;
import static org.junit.Assert.*;
@@ -24,8 +23,6 @@ import static org.junit.Assert.*;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.junit.Test;
-import java.util.Iterator;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
@@ -287,7 +284,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
@Override
- public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
+ public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
}
@ConstantFields("0")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index 64a4791..62d5d9b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -16,17 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.util.Iterator;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
@@ -214,7 +212,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
iter.getWorkset().join(invariantInput)
.where(1, 2)
.equalTo(1, 2)
- .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+ .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return first;
}
@@ -224,7 +222,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
result.join(iter.getSolutionSet())
.where(1, 0)
.equalTo(0, 2)
- .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+ .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return second;
}
@@ -283,7 +281,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
.reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
- public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
+ public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
})
.name(NEXT_WORKSET_REDUCER_NAME)
.withConstantSet("1->1","2->2","0->0");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index 42275af..e06e3ba 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.testfunctions;
-import java.util.Iterator;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
@@ -32,9 +30,9 @@ public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<T> values, Collector<T> out) {
- while (values.hasNext()) {
- out.collect(values.next());
+ public void reduce(Iterable<T> values, Collector<T> out) {
+ for (T next : values) {
+ out.collect(next);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index 3f24e65..b1a0e2d 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.testfunctions;
-import java.util.Iterator;
-
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
@@ -32,7 +29,7 @@ public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<T> values, Collector<T> out) {
- out.collect(values.next());
+ public void reduce(Iterable<T> values, Collector<T> out) {
+ out.collect(values.iterator().next());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
index 6ef1651..13cd37b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.util;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
index 3f32423..b78a850 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.util;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index cc8fe78..1a0abeb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;
import java.io.Serializable;
-import java.util.Iterator;
import org.apache.flink.util.Collector;
@@ -41,11 +40,11 @@ import org.apache.flink.util.Collector;
* Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
* with in empty input for the side of the data set that did not contain elements with that specific key.
*
- * @param <V1> The data type of the first input data set.
- * @param <V2> The data type of the second input data set.
+ * @param <IN1> The data type of the first input data set.
+ * @param <IN2> The data type of the second input data set.
* @param <O> The data type of the returned elements.
*/
-public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
+public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
/**
* This method must be implemented to provide a user implementation of a
@@ -59,5 +58,5 @@ public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
- void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;
+ public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index aed7fae..0588526 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;
import java.io.Serializable;
-import java.util.Iterator;
/**
* Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
@@ -45,5 +44,5 @@ public interface CombineFunction<T> extends Function, Serializable {
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
- T combine(Iterator<T> values) throws Exception;
+ T combine(Iterable<T> values) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index edf1614..bbcbd0a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;
import java.io.Serializable;
-import java.util.Iterator;
import org.apache.flink.util.Collector;
@@ -47,5 +46,5 @@ public interface FlatCombineFunction<T> extends Function, Serializable {
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
- void combine(Iterator<T> values, Collector<T> out) throws Exception;
+ void combine(Iterable<T> values, Collector<T> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 5fa959c..54cc5c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;
import java.io.Serializable;
-import java.util.Iterator;
import org.apache.flink.util.Collector;
@@ -53,5 +52,5 @@ public interface GroupReduceFunction<T, O> extends Function, Serializable {
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
- void reduce(Iterator<T> values, Collector<O> out) throws Exception;
+ void reduce(Iterable<T> values, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
new file mode 100644
index 0000000..a15c31c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public class TraversableOnceException extends RuntimeException {
+
+ private static final long serialVersionUID = 7636881584773577290L;
+
+ public TraversableOnceException() {
+ super("The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
index 2d794bd..3d68e99 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
@@ -35,7 +35,7 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
/**
- * Triangle enumeration is a preprocessing step to find closely connected parts in graphs.
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
* A triangle consists of three edges that connect three vertices with each other.
*
* <p>
@@ -154,7 +154,9 @@ public class EnumTrianglesBasic {
private final Triad outTriad = new Triad();
@Override
- public void reduce(Iterator<Edge> edges, Collector<Triad> out) throws Exception {
+ public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+
+ final Iterator<Edge> edges = edgesIter.iterator();
// clear vertex list
vertices.clear();
@@ -165,11 +167,11 @@ public class EnumTrianglesBasic {
vertices.add(firstEdge.getSecondVertex());
// build and emit triads
- while(edges.hasNext()) {
+ while (edges.hasNext()) {
Integer higherVertexId = edges.next().getSecondVertex();
// combine vertex with all previously read vertices
- for(Integer lowerVertexId : vertices) {
+ for (Integer lowerVertexId : vertices) {
outTriad.setSecondVertex(lowerVertexId);
outTriad.setThirdVertex(higherVertexId);
out.collect(outTriad);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
index c0ea26a..efccb59 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
@@ -38,7 +38,7 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.EdgeWithD
import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad;
/**
- * Triangle enumeration is a preprocessing step to find closely connected parts in graphs.
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
* A triangle consists of three edges that connect three vertices with each other.
*
* <p>
@@ -166,8 +166,9 @@ public class EnumTrianglesOpt {
final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
@Override
- public void reduce(Iterator<Edge> edges, Collector<EdgeWithDegrees> out) throws Exception {
+ public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) {
+ Iterator<Edge> edges = edgesIter.iterator();
otherVertices.clear();
// get first edge
@@ -176,7 +177,7 @@ public class EnumTrianglesOpt {
this.otherVertices.add(edge.getSecondVertex());
// get all other edges (assumes edges are sorted by second vertex)
- while(edges.hasNext()) {
+ while (edges.hasNext()) {
edge = edges.next();
Integer otherVertex = edge.getSecondVertex();
// collect unique vertices
@@ -274,7 +275,8 @@ public class EnumTrianglesOpt {
private final Triad outTriad = new Triad();
@Override
- public void reduce(Iterator<Edge> edges, Collector<Triad> out) throws Exception {
+ public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+ final Iterator<Edge> edges = edgesIter.iterator();
// clear vertex list
vertices.clear();
@@ -285,7 +287,7 @@ public class EnumTrianglesOpt {
vertices.add(firstEdge.getSecondVertex());
// build and emit triads
- while(edges.hasNext()) {
+ while (edges.hasNext()) {
Integer higherVertexId = edges.next().getSecondVertex();
// combine vertex with all previously read vertices
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
index ba9754f..e6a9272 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
@@ -21,7 +21,6 @@ package org.apache.flink.example.java.graph;
import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
import java.util.ArrayList;
-import java.util.Iterator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -163,12 +162,11 @@ public class PageRankBasic {
private final ArrayList<Long> neighbors = new ArrayList<Long>();
@Override
- public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
neighbors.clear();
Long id = 0L;
- while (values.hasNext()) {
- Tuple2<Long, Long> n = values.next();
+ for (Tuple2<Long, Long> n : values) {
id = n.f0;
neighbors.add(n.f1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
index 22054da..e9ba406 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
-import java.util.Iterator;
-
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
@@ -70,8 +68,8 @@ public class TransitiveClosureNaive implements ProgramDescription {
.groupBy(0, 1)
.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
- public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(values.next());
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(values.iterator().next());
}
});
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
index 3033c0d..9ca6ea9 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.example.java.relational;
-import java.util.Iterator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
@@ -249,12 +247,12 @@ public class WebLogAnalysis {
* 2: AVG_DURATION
*/
@Override
- public void coGroup(Iterator<Tuple3<Integer, String, Integer>> ranks, Iterator<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+ public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
// Check if there is a entry in the visits relation
- if (!visits.hasNext()) {
- while (ranks.hasNext()) {
+ if (!visits.iterator().hasNext()) {
+ for (Tuple3<Integer, String, Integer> next : ranks) {
// Emit all rank pairs
- out.collect(ranks.next());
+ out.collect(next);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index af0ea52..bcc523d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.io.CsvReader;
@@ -253,6 +254,26 @@ public abstract class ExecutionEnvironment {
public CsvReader readCsvFile(String filePath) {
return new CsvReader(filePath, this);
}
+
+ // ------------------------------------ File Input Format -----------------------------------------
+
+ public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, String filePath) {
+ if (inputFormat == null) {
+ throw new IllegalArgumentException("InputFormat must not be null.");
+ }
+ if (filePath == null) {
+ throw new IllegalArgumentException("The file path must not be null.");
+ }
+
+ inputFormat.setFilePath(new Path(filePath));
+ try {
+ return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
+ }
+ catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be automatically determined. " +
+ "Please specify the TypeInformation of the produced type explicitly.");
+ }
+ }
// ----------------------------------- Generic Input Format ---------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
index b363606..087808d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
@@ -28,13 +28,13 @@ public abstract class GroupReduceIterator<IN, OUT> extends RichGroupReduceFuncti
private static final long serialVersionUID = 1L;
- public abstract Iterator<OUT> reduceGroup(Iterator<IN> values) throws Exception;
+ public abstract Iterator<OUT> reduceGroup(Iterable<IN> values) throws Exception;
// -------------------------------------------------------------------------------------------
@Override
- public final void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception {
+ public final void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception {
for (Iterator<OUT> iter = reduceGroup(values); iter.hasNext(); ) {
out.collect(iter.next());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
index e78c31e..3169622 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.functions;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichFunction;
@@ -40,6 +38,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFun
private static final long serialVersionUID = 1L;
@Override
- public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception;
+ public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
index 97c15ff..a5d45aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -24,8 +24,6 @@ import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;
-import java.util.Iterator;
-
/**
* Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
@@ -39,5 +37,5 @@ public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction im
private static final long serialVersionUID = 1L;
@Override
- public abstract void combine(Iterator<T> values, Collector<T> out) throws Exception;
+ public abstract void combine(Iterable<T> values, Collector<T> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
index 801f592..9198aeb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
@@ -22,7 +22,6 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatCombineFunction;
@@ -44,7 +43,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
private static final long serialVersionUID = 1L;
@Override
- public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception;
+ public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
/**
* The combine methods pre-reduces elements. It may be called on subsets of the data
@@ -59,7 +58,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
* <p>
* Since the reduce function will be called on the result of this method, it is important that this
* method returns the same data type as it consumes. By default, this method only calls the
- * {@link #reduce(Iterator, Collector)} method. If the behavior in the pre-reducing is different
+ * {@link #reduce(Iterable, Collector)} method. If the behavior in the pre-reducing is different
* from the final reduce function (for example because the reduce function changes the data type),
* this method must be overwritten, or the execution will fail.
*
@@ -70,7 +69,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
* to fail and may trigger recovery.
*/
@Override
- public void combine(Iterator<IN> values, Collector<IN> out) throws Exception {
+ public void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
@SuppressWarnings("unchecked")
Collector<OUT> c = (Collector<OUT>) out;
reduce(values, c);
@@ -80,7 +79,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
/**
* This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark
- * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterator, Collector)}
+ * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterable, Collector)}
* method on such functions, to pre-reduce the data before transferring it over the network to
* the actual group reduce operation.
* <p>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 80a5fa0..1ceb8c8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -271,13 +271,14 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
}
@Override
- public void reduce(Iterator<T> values, Collector<T> out) {
+ public void reduce(Iterable<T> records, Collector<T> out) {
final AggregationFunction<Object>[] aggFunctions = this.aggFunctions;
final int[] fieldPositions = this.fieldPositions;
// aggregators are initialized from before
T current = null;
+ final Iterator<T> values = records.iterator();
while (values.hasNext()) {
current = values.next();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 744893b..a890b32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -301,7 +301,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
/**
* Intermediate step of a CoGroup transformation. <br/>
* To continue the CoGroup transformation, select the grouping key of the first input {@link DataSet} by calling
- * {@link CoGroupOperatorSets#where()} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
+ * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(int, int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
*
* @param <I1> The type of the first input DataSet of the CoGroup transformation.
* @param <I2> The type of the second input DataSet of the CoGroup transformation.
@@ -328,7 +328,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
* @param field0 The first index of the Tuple fields of the first co-grouped DataSets that should be used as key
* @param fields The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys.
* @return An incomplete CoGroup transformation.
- * Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+ * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup.
*
* @see Tuple
* @see DataSet
@@ -348,7 +348,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
* @param field0 The first field of the Tuple fields of the first co-grouped DataSets that should be used as key
* @param fields The fields of the first co-grouped DataSets that should be used as keys.
* @return An incomplete CoGroup transformation.
- * Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+ * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup.
*
* @see Tuple
* @see DataSet
@@ -367,7 +367,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
*
* @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped.
* @return An incomplete CoGroup transformation.
- * Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup.
+ * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup.
*
* @see KeySelector
* @see DataSet
@@ -381,7 +381,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
/**
* Intermediate step of a CoGroup transformation. <br/>
* To continue the CoGroup transformation, select the grouping key of the second input {@link DataSet} by calling
- * {@link CoGroupOperatorSetsPredicate#equalTo(int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
+ * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
*
*/
public final class CoGroupOperatorSetsPredicate {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 7646fa0..fd35773 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.operators;
-import java.util.Iterator;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -148,8 +146,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<T> values, Collector<T> out) {
- out.collect(values.next());
+ public void reduce(Iterable<T> values, Collector<T> out) {
+ out.collect(values.iterator().next());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index e1424ad..7b0ad4d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -37,7 +37,6 @@ import org.apache.flink.types.TypeInformation;
import org.apache.flink.api.java.DataSet;
-
/**
* This operator represents the application of a "reduceGroup" function on a data set, and the
* result data set produced by the function.
@@ -185,7 +184,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
else {
throw new UnsupportedOperationException("Unrecognized key type.");
}
-
}
@@ -215,7 +213,4 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
return reducer;
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
index 89290f0..36bae6b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
@@ -78,47 +76,11 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
@Override
- public void coGroup(Iterator<Tuple2<K, I1>> records1, Iterator<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
- iter1.set(records1);
- iter2.set(records2);
+ public void coGroup(Iterable<Tuple2<K, I1>> records1, Iterable<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
+ iter1.set(records1.iterator());
+ iter2.set(records2.iterator());
this.wrappedFunction.coGroup(iter1, iter2, out);
}
}
-
- public static class UnwrappingKeyIterator<K, I1> implements Iterator<I1> {
-
- private Iterator<Tuple2<K, I1>> outerIterator;
- I1 firstValue;
-
- public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1) {
- this.outerIterator = records1;
- this.firstValue = null;
- }
-
- public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1, I1 firstValue ) {
- this.outerIterator = records1;
- this.firstValue = firstValue;
- }
-
- @Override
- public boolean hasNext() {
- return firstValue != null || outerIterator.hasNext();
- }
-
- @Override
- public I1 next() {
- if(firstValue != null) {
- firstValue = null;
- return firstValue;
- }
- return outerIterator.next().getField(1);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
index 73ea004..efd52d5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
@@ -68,18 +68,10 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
super(wrapped);
}
- //@SuppressWarnings("unchecked")
- //@Override
- //public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) throws Exception {
- // return wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1)));
- //}
-
@SuppressWarnings("unchecked")
@Override
public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 29eb5ed..7e3f0e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -65,14 +63,14 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
@Override
- public void reduce(Iterator<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
- iter.set(values);
+ public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
+ iter.set(values.iterator());
this.wrappedFunction.reduce(iter, out);
}
@Override
- public void combine(Iterator<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception {
- iter.set(values);
+ public void combine(Iterable<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception {
+ iter.set(values.iterator());
coll.set(out);
this.wrappedFunction.combine(iter, coll);
}
@@ -98,8 +96,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
@Override
- public void reduce(Iterator<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
- iter.set(values);
+ public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
+ iter.set(values.iterator());
this.wrappedFunction.reduce(iter, out);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 1f2c208..c09f3a8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -21,20 +21,23 @@ package org.apache.flink.api.java.operators.translation;
import java.util.Iterator;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TraversableOnceException;
/**
* An iterator that reads 2-tuples (key value pairs) and returns only the values (second field).
* The iterator also tracks the keys, as the pairs flow though it.
*/
-public class TupleUnwrappingIterator<T, K> implements Iterator<T>, java.io.Serializable {
+public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable {
private static final long serialVersionUID = 1L;
private K lastKey;
private Iterator<Tuple2<K, T>> iterator;
+ private boolean iteratorAvailable;
public void set(Iterator<Tuple2<K, T>> iterator) {
this.iterator = iterator;
+ this.iteratorAvailable = true;
}
public K getLastKey() {
@@ -57,4 +60,14 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, java.io.Seria
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<T> iterator() {
+ if (iteratorAvailable) {
+ iteratorAvailable = false;
+ return this;
+ } else {
+ throw new TraversableOnceException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 267d879..748e3f3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -42,7 +42,7 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
private static final long serialVersionUID = 1L;
- protected final T wrappedFunction;
+ protected T wrappedFunction;
protected WrappingFunction(T wrappedFunction) {
this.wrappedFunction = wrappedFunction;
@@ -174,6 +174,5 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
public <T extends Value> T getPreviousIterationAggregate(String name) {
return ((IterationRuntimeContext) context).<T>getPreviousIterationAggregate(name);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index e9b5c25..fc7ef49 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
/**
* The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
*/
-public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
+public abstract class CoGroupFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
@@ -45,7 +45,5 @@ public abstract class CoGroupFunction extends AbstractRichFunction implements or
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
- @Override
public abstract void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index a1e6369..9848499 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -16,14 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.functions;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@@ -31,7 +28,7 @@ import org.apache.flink.util.Collector;
* The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
* {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
*/
-public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
+public abstract class ReduceFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
@@ -47,7 +44,6 @@ public abstract class ReduceFunction extends AbstractRichFunction implements Gro
* runtime catches an exception, it aborts the reduce task and lets the fail-over logic
* decide whether to retry the reduce execution.
*/
- @Override
public abstract void reduce(Iterator<Record> records, Collector<Record> out) throws Exception;
/**
@@ -71,7 +67,6 @@ public abstract class ReduceFunction extends AbstractRichFunction implements Gro
* runtime catches an exception, it aborts the combine task and lets the fail-over logic
* decide whether to retry the combiner execution.
*/
- @Override
public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
// to be implemented, if the reducer should use a combiner. Note that the combining method
// is only used, if the stub class is further annotated with the annotation
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
index a8cedeb..c958b84 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
@@ -16,9 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.operators;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -29,13 +31,15 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.RecordOperator;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
/**
* CoGroupOperator that applies a {@link CoGroupFunction} to groups of records sharing
@@ -43,7 +47,7 @@ import org.apache.flink.types.Record;
*
* @see CoGroupFunction
*/
-public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, CoGroupFunction> implements RecordOperator {
+public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> implements RecordOperator {
/**
* The types of the keys that the operator groups on.
@@ -61,7 +65,8 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
* @param keyColumn2 The position of the key in the second input's records.
*/
public static Builder builder(CoGroupFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
- return new Builder(new UserCodeObjectWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2);
+ WrappingCoGroupFunction wrapper = new WrappingCoGroupFunction(udf);
+ return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
}
/**
@@ -75,7 +80,8 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
public static Builder builder(Class<? extends CoGroupFunction> udf, Class<? extends Key<?>> keyClass,
int keyColumn1, int keyColumn2)
{
- return new Builder(new UserCodeClassWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2);
+ WrappingCoGroupFunction wrapper = new WrappingClassCoGroupFunction(udf);
+ return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
}
/**
@@ -96,7 +102,9 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
setBroadcastVariables(builder.broadcastInputs);
setGroupOrderForInputOne(builder.secondaryOrder1);
setGroupOrderForInputTwo(builder.secondaryOrder2);
- setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
+
+ CoGroupFunction function = ((WrappingCoGroupFunction) builder.udf.getUserCodeObject()).getWrappedFunction();
+ setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new UserCodeObjectWrapper<CoGroupFunction>(function)));
}
// --------------------------------------------------------------------------------------------
@@ -115,7 +123,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
public static class Builder {
/* The required parameters */
- private final UserCodeWrapper<CoGroupFunction> udf;
+ private final UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf;
private final List<Class<? extends Key<?>>> keyClasses;
private final List<Integer> keyColumns1;
private final List<Integer> keyColumns2;
@@ -136,7 +144,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
* @param keyColumn1 The position of the key in the first input's records.
* @param keyColumn2 The position of the key in the second input's records.
*/
- protected Builder(UserCodeWrapper<CoGroupFunction> udf, Class<? extends Key<?>> keyClass,
+ protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf, Class<? extends Key<?>> keyClass,
int keyColumn1, int keyColumn2)
{
this.udf = udf;
@@ -157,7 +165,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
*
* @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
*/
- protected Builder(UserCodeWrapper<CoGroupFunction> udf) {
+ protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf) {
this.udf = udf;
this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
this.keyColumns1 = new ArrayList<Integer>();
@@ -338,4 +346,39 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
return new CoGroupOperator(this);
}
}
+
+ // ============================================================================================
+
+ public static class WrappingCoGroupFunction extends WrappingFunction<CoGroupFunction>
+ implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ public WrappingCoGroupFunction(CoGroupFunction coGrouper) {
+ super(coGrouper);
+ }
+
+ @Override
+ public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) throws Exception {
+ this.wrappedFunction.coGroup(records1.iterator(), records2.iterator(), out);
+ }
+ }
+
+ public static final class WrappingClassCoGroupFunction extends WrappingCoGroupFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public WrappingClassCoGroupFunction(Class<? extends CoGroupFunction> reducer) {
+ super(InstantiationUtil.instantiate(reducer));
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeObject(wrappedFunction.getClass());
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ Class<?> clazz = (Class<?>) in.readObject();
+ this.wrappedFunction = (CoGroupFunction) InstantiationUtil.instantiate(clazz);
+ }
+ }
}