You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/25 17:19:30 UTC
[1/2] flink git commit: [Flink-1780] Rename FlatCombineFunction to
GroupCombineFunction
Repository: flink
Updated Branches:
refs/heads/master ae04025f1 -> 033c69f94
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
index 2a97c60..3e9fde7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
@@ -18,7 +18,7 @@
package org.apache.flink.test.javaApiOperators;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
@@ -284,7 +284,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
// partition and group data
UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1);
- partitionedDS.combineGroup(new FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
+ partitionedDS.combineGroup(new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
@Override
public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
int count = 0;
@@ -334,7 +334,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
// partition and group data
UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1);
- partitionedDS.combineGroup(new FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
+ partitionedDS.combineGroup(new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
@Override
public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
int count = 0;
@@ -372,21 +372,21 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
});
// all methods on DataSet
- ds.combineGroup(new FlatCombineFunctionExample())
+ ds.combineGroup(new GroupCombineFunctionExample())
.output(new DiscardingOutputFormat<Tuple1<String>>());
// all methods on UnsortedGrouping
- ds.groupBy(0).combineGroup(new FlatCombineFunctionExample())
+ ds.groupBy(0).combineGroup(new GroupCombineFunctionExample())
.output(new DiscardingOutputFormat<Tuple1<String>>());
// all methods on SortedGrouping
- ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new FlatCombineFunctionExample())
+ ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new GroupCombineFunctionExample())
.output(new DiscardingOutputFormat<Tuple1<String>>());
env.execute();
}
- public static class FlatCombineFunctionExample implements FlatCombineFunction<Tuple1<String>, Tuple1<String>> {
+ public static class GroupCombineFunctionExample implements GroupCombineFunction<Tuple1<String>, Tuple1<String>> {
@Override
public void combine(Iterable<Tuple1<String>> values, Collector<Tuple1<String>> out) throws Exception {
@@ -396,7 +396,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
}
}
- public static class ScalaFlatCombineFunctionExample implements FlatCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> {
+ public static class ScalaGroupCombineFunctionExample implements GroupCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> {
@Override
public void combine(Iterable<scala.Tuple1<String>> values, Collector<scala.Tuple1<String>> out) throws Exception {
@@ -406,7 +406,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
}
}
- public static class IdentityFunction implements FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>,
+ public static class IdentityFunction implements GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>,
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
@Override
@@ -510,7 +510,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
}
- public interface CombineAndReduceGroup <IN, INT, OUT> extends FlatCombineFunction<IN, INT>, GroupReduceFunction<INT, OUT> {
+ public interface CombineAndReduceGroup <IN, INT, OUT> extends GroupCombineFunction<IN, INT>, GroupReduceFunction<INT, OUT> {
}
public interface KvGroupReduce<K, V, INT, OUT> extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, Tuple2<K, OUT>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index ef484df..380b3bc 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -43,7 +43,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
.map(str => Tuple1(str))
// all methods on DataSet
- ds.combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample())
+ ds.combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample())
.output(new DiscardingOutputFormat[Tuple1[String]])
ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
@@ -51,7 +51,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
// all methods on UnsortedGrouping
ds.groupBy(0)
- .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample())
+ .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample())
.output(new DiscardingOutputFormat[Tuple1[String]])
ds.groupBy(0)
@@ -60,7 +60,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
// all methods on SortedGrouping
ds.groupBy(0).sortGroup(0, Order.ASCENDING)
- .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample())
+ .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample())
.output(new DiscardingOutputFormat[Tuple1[String]])
ds.groupBy(0).sortGroup(0, Order.ASCENDING)
[2/2] flink git commit: [Flink-1780] Rename FlatCombineFunction to
GroupCombineFunction
Posted by mx...@apache.org.
[Flink-1780] Rename FlatCombineFunction to GroupCombineFunction
This closes #530
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/033c69f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/033c69f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/033c69f9
Branch: refs/heads/master
Commit: 033c69f9477c6352865e7e0da01296dd778ffe59
Parents: ae04025
Author: Suneel Marthi <su...@gmail.com>
Authored: Tue Mar 24 16:19:32 2015 -0400
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 25 17:13:51 2015 +0100
----------------------------------------------------------------------
docs/dataset_transformations.md | 2 +-
.../api/common/functions/CombineFunction.java | 2 +-
.../common/functions/FlatCombineFunction.java | 51 --------------------
.../common/functions/GroupCombineFunction.java | 51 ++++++++++++++++++++
.../functions/RichFlatCombineFunction.java | 42 ----------------
.../functions/RichGroupCombineFunction.java | 39 +++++++++++++++
.../functions/RichGroupReduceFunction.java | 4 +-
.../base/GroupCombineOperatorBase.java | 6 +--
.../operators/base/GroupReduceOperatorBase.java | 8 +--
.../java/org/apache/flink/api/java/DataSet.java | 4 +-
.../flink/api/java/functions/FirstReducer.java | 4 +-
.../java/operators/GroupCombineOperator.java | 22 ++++-----
.../api/java/operators/GroupReduceOperator.java | 6 +--
.../api/java/operators/SortedGrouping.java | 4 +-
.../api/java/operators/UnsortedGrouping.java | 4 +-
.../PlanUnwrappingGroupCombineOperator.java | 12 ++---
.../PlanUnwrappingReduceGroupOperator.java | 10 ++--
...lanUnwrappingSortedGroupCombineOperator.java | 12 ++---
...PlanUnwrappingSortedReduceGroupOperator.java | 10 ++--
.../java/record/operators/ReduceOperator.java | 4 +-
.../flink/api/java/typeutils/TypeExtractor.java | 24 ++++-----
.../java/record/ReduceWrappingFunctionTest.java | 16 +++---
.../operators/AllGroupCombineDriver.java | 16 +++---
.../runtime/operators/AllGroupReduceDriver.java | 10 ++--
.../operators/GroupReduceCombineDriver.java | 20 ++++----
.../runtime/operators/RegularPactTask.java | 14 ++++--
.../SynchronousChainedCombineDriver.java | 12 ++---
.../sort/CombiningUnilateralSortMerger.java | 16 +++---
.../runtime/blob/BlobCacheSuccessTest.java | 18 +++----
.../org/apache/flink/api/scala/DataSet.scala | 4 +-
.../apache/flink/api/scala/GroupedDataSet.scala | 6 +--
.../CustomRankCombiner.java | 4 +-
.../javaApiOperators/GroupCombineITCase.java | 20 ++++----
.../scala/operators/GroupCombineITCase.scala | 6 +--
34 files changed, 242 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 2bec61b..3bb3cec 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -554,7 +554,7 @@ an alternative WordCount implementation. In the implementation,
DataSet<String> input = [..] // The words received as input
DataSet<String> groupedInput = input.groupBy(0); // group identical words
-DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new FlatCombineFunction<String, Tuple2<String, Integer>() {
+DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
int count = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index ef52b32..af115b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
* reduce the data volume earlier, before the entire groups have been collected.
* <p>
* This special variant of the combine function reduces the group of elements into a single element. A variant
- * that can return multiple values per group is defined in {@link FlatCombineFunction}.
+ * that can return multiple values per group is defined in {@link GroupCombineFunction}.
*
* @param <IN> The data type processed by the combine function.
* @param <OUT> The data type emitted by the combine function.
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
deleted file mode 100644
index b90b3ce..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
- * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
- * only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
- * reduce the data volume earlier, before the entire groups have been collected.
- * <p>
- * This special variant of the combine function supports to return more than one element per group.
- * It is frequently less efficient to use than the {@link CombineFunction}.
- *
- * @param <IN> The data type processed by the combine function.
- * @param <OUT> The data type emitted by the combine function.
- */
-public interface FlatCombineFunction<IN, OUT> extends Function, Serializable {
-
- /**
- * The combine method, called (potentially multiple timed) with subgroups of elements.
- *
- * @param values The elements to be combined.
- * @param out The collector to use to return values from the function.
- *
- * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
- * and may trigger the recovery logic.
- */
- void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
new file mode 100644
index 0000000..c0b153b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been collected.
+ * <p>
+ * This special variant of the combine function supports to return more than one element per group.
+ * It is frequently less efficient to use than the {@link CombineFunction}.
+ *
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
+ */
+public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {
+
+ /**
+ * The combine method, called (potentially multiple timed) with subgroups of elements.
+ *
+ * @param values The elements to be combined.
+ * @param out The collector to use to return values from the function.
+ *
+ * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+ * and may trigger the recovery logic.
+ */
+ void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
deleted file mode 100644
index 17aca88..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
- * {@link RichFunction#close()}.
- *
- * @param <IN> The data type of the elements to be combined.
- * @param <OUT> The resulting data type of the elements to be combined.
- */
-public abstract class RichFlatCombineFunction<IN, OUT> extends AbstractRichFunction implements FlatCombineFunction<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
new file mode 100644
index 0000000..55df232
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link GroupCombineFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> The data type of the elements to be combined.
+ * @param <OUT> The resulting data type of the elements to be combined.
+ */
+public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index b6c92c2..48e27d3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
* @param <IN> Type of the elements that this function processes.
* @param <OUT> The type of the elements returned by the user-defined function.
*/
-public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN, IN> {
+public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, GroupCombineFunction<IN, IN> {
private static final long serialVersionUID = 1L;
@@ -83,5 +83,5 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
- public static @interface Combinable {};
+ public static @interface Combinable {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index 2a47c45..27fbc1c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -46,7 +46,7 @@ import java.util.List;
* This class is later processed by the compiler to generate the plan.
* @see org.apache.flink.api.common.functions.CombineFunction
*/
-public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
/** The ordering for the order inside a reduce group. */
@@ -81,7 +81,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN
@Override
protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
- FlatCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
+ GroupCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
TypeInformation<IN> inputType = operatorInfo.getInputType();
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 6d7db89..57f07f3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.common.operators.base;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -107,15 +107,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
/**
* Marks the group reduce operation as combinable. Combinable operations may pre-reduce the
* data before the actual group reduce operations. Combinable user-defined functions
- * must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}.
+ * must implement the interface {@link GroupCombineFunction}.
*
* @param combinable Flag to mark the group reduce operation as combinable.
*/
public void setCombinable(boolean combinable) {
// sanity check
- if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
+ if (combinable && !GroupCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " +
- FlatCombineFunction.class.getName());
+ GroupCombineFunction.class.getName());
} else {
this.combinable = combinable;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ed8d1ca..1e91eeb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -473,7 +473,7 @@ public abstract class DataSet<T> {
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
- public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+ public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index fbb7029..a604cc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -17,13 +17,13 @@
*/
package org.apache.flink.api.java.functions;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
@Combinable
-public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T, T> {
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
private static final long serialVersionUID = 1L;
private final int count;
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 3c1d47c..911c608 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
@@ -46,7 +46,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
*/
public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupCombineOperator<IN, OUT>> {
- private final FlatCombineFunction<IN, OUT> function;
+ private final GroupCombineFunction<IN, OUT> function;
private final Grouping<IN> grouper;
@@ -60,7 +60,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
* @param function The user-defined GroupReduce function.
* @param defaultName The operator's name.
*/
- public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+ public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
this.function = function;
this.grouper = null;
@@ -73,7 +73,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
* @param input The grouped input to be processed group-wise by the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
- public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+ public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
super(input != null ? input.getDataSet() : null, resultType);
this.function = function;
@@ -82,7 +82,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
}
@Override
- protected FlatCombineFunction<IN, OUT> getFunction() {
+ protected GroupCombineFunction<IN, OUT> getFunction() {
return function;
}
@@ -99,8 +99,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
if (grouper == null) {
// non grouped reduce
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
- GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
- new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+ GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
+ new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
po.setInput(input);
// the parallelism for a non grouped reduce can only be 1
@@ -144,8 +144,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
- GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
- new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+ GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
+ new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
po.setInput(input);
po.setParallelism(getParallelism());
@@ -175,7 +175,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
// --------------------------------------------------------------------------------------------
private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer(
- Keys.SelectorFunctionKeys<IN, ?> rawKeys, FlatCombineFunction<IN, OUT> function,
+ Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupCombineFunction<IN, OUT> function,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
{
@SuppressWarnings("unchecked")
@@ -199,7 +199,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
}
private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
- Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> function,
+ Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> function,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
{
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index e809623..30f2cc4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
@@ -88,7 +88,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
}
private void checkCombinability() {
- if (function instanceof FlatCombineFunction &&
+ if (function instanceof GroupCombineFunction &&
function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) {
this.combinable = true;
}
@@ -111,7 +111,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
// sanity check that the function is a subclass of the combine interface
- if (combinable && !(function instanceof FlatCombineFunction)) {
+ if (combinable && !(function instanceof GroupCombineFunction)) {
throw new IllegalArgumentException("The function does not implement the combine interface.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index b2054bf..287bf82 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.Utils;
@@ -169,7 +169,7 @@ public class SortedGrouping<T> extends Grouping<T> {
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
- public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+ public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 0f3faa0..319a599 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -19,7 +19,7 @@
package org.apache.flink.api.java.operators;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -173,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
- public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
+ public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index ae4ba11..c8e40ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
* A group combine operator that takes 2-tuples (key-value pairs), and applies the group combine operation only
* on the unwrapped values.
*/
-public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, OUT>> {
+public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {
- public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
+ public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
{
super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
@@ -42,15 +42,15 @@ public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombine
// --------------------------------------------------------------------------------------------
- public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
- implements FlatCombineFunction<Tuple2<K, IN>, OUT>
+ public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+ implements GroupCombineFunction<Tuple2<K, IN>, OUT>
{
private static final long serialVersionUID = 1L;
private final TupleUnwrappingIterator<IN, K> iter;
- private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, OUT> wrapped) {
+ private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 1d59a21..e01af50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
@@ -37,7 +37,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
{
- super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+ super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
super.setCombinable(combinable);
@@ -46,8 +46,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
// --------------------------------------------------------------------------------------------
@RichGroupReduceFunction.Combinable
- public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
+ public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
{
private static final long serialVersionUID = 1L;
@@ -55,7 +55,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
private TupleUnwrappingIterator<IN, K> iter;
private TupleWrappingCollector<IN, K> coll;
- private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
+ private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
this.coll = new TupleWrappingCollector<IN, K>(this.iter);
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index b3d8470..e52a5c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
* A reduce operator that takes 3-tuples (groupKey, sortKey, value), and applies the sorted partial group reduce
* operation only on the unwrapped values.
*/
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, FlatCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
- public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
+ public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
{
super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
@@ -42,15 +42,15 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G
}
- public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
- implements FlatCombineFunction<Tuple3<K1, K2, IN>, OUT>
+ public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+ implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
{
private static final long serialVersionUID = 1L;
private final Tuple3UnwrappingIterator<IN, K1, K2> iter;
- private TupleUnwrappingGroupReducer(FlatCombineFunction<IN, OUT> wrapped) {
+ private TupleUnwrappingGroupReducer(GroupCombineFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 757ff56..63ebfa4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
@@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable)
{
- super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+ super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
super.setCombinable(combinable);
@@ -46,8 +46,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
// --------------------------------------------------------------------------------------------
@RichGroupReduceFunction.Combinable
- public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, FlatCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
+ public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
{
private static final long serialVersionUID = 1L;
@@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
private Tuple3UnwrappingIterator<IN, K1, K2> iter;
private Tuple3WrappingCollector<IN, K1, K2> coll;
- private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
+ private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter);
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index 875e9c1..1866fea 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Ordering;
@@ -367,7 +367,7 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Grou
// ============================================================================================
- public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record, Record> {
+ public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 4527aa0..ae6063a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -34,7 +34,7 @@ import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
@@ -135,14 +135,14 @@ public class TypeExtractor {
return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
}
- public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
+ public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
return getGroupCombineReturnTypes(combineInterface, inType, null, false);
}
- public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
+ public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) combineInterface, FlatCombineFunction.class, true, true, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
}
@@ -600,7 +600,7 @@ public class TypeExtractor {
// the input is a tuple
else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType)
&& Tuple.class.isAssignableFrom(typeToClass(inType))) {
- ParameterizedType tupleBaseClass = null;
+ ParameterizedType tupleBaseClass;
// get tuple from possible tuple subclass
while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) {
@@ -737,7 +737,7 @@ public class TypeExtractor {
// check for basic type
if (typeInfo.isBasicType()) {
- TypeInformation<?> actual = null;
+ TypeInformation<?> actual;
// check if basic type at all
if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
throw new InvalidTypesException("Basic type expected.");
@@ -792,7 +792,7 @@ public class TypeExtractor {
}
// check writable type contents
- Class<?> clazz = null;
+ Class<?> clazz;
if (((WritableTypeInfo<?>) typeInfo).getTypeClass() != (clazz = (Class<?>) type)) {
throw new InvalidTypesException("Writable type '"
+ ((WritableTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
@@ -801,7 +801,7 @@ public class TypeExtractor {
}
// check for primitive array
else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
- Type component = null;
+ Type component;
// check if array at all
if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
@@ -819,7 +819,7 @@ public class TypeExtractor {
}
// check for basic array
else if (typeInfo instanceof BasicArrayTypeInfo<?, ?>) {
- Type component = null;
+ Type component;
// check if array at all
if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
@@ -844,7 +844,7 @@ public class TypeExtractor {
}
// check component
- Type component = null;
+ Type component;
if (type instanceof Class<?>) {
component = ((Class<?>) type).getComponentType();
} else {
@@ -1428,8 +1428,8 @@ public class TypeExtractor {
if (!(t1 instanceof TypeVariable) || !(t2 instanceof TypeVariable)) {
return false;
}
- return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>)t2).getName())
- && ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>)t2).getGenericDeclaration());
+ return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName())
+ && ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration());
}
private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
index 2216217..89baa98 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -18,15 +18,12 @@
package org.apache.flink.api.java.record;
-import static org.junit.Assert.*;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
@@ -43,6 +40,11 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
@SuppressWarnings({ "serial", "deprecation" })
public class ReduceWrappingFunctionTest {
@@ -86,7 +88,7 @@ public class ReduceWrappingFunctionTest {
target.clear();
// test combine
- ((FlatCombineFunction<Record, Record>) reducer).combine(source, collector);
+ ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
assertEquals(2, target.size());
assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
@@ -138,7 +140,7 @@ public class ReduceWrappingFunctionTest {
target.clear();
// test combine
- ((FlatCombineFunction<Record, Record>) reducer).combine(source, collector);
+ ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
assertEquals(2, target.size());
assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
@@ -227,5 +229,5 @@ public class ReduceWrappingFunctionTest {
methodCounter.incrementAndGet();
super.open(parameters);
}
- };
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 7d87a6b..7b279ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -20,7 +20,7 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
@@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory;
* Like @org.apache.flink.runtime.operators.GroupCombineDriver but without grouping and sorting. May emit partially
* reduced results.
*
-* @see org.apache.flink.api.common.functions.FlatCombineFunction
+* @see GroupCombineFunction
*/
-public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+public class AllGroupCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class);
- private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+ private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
private boolean objectReuseEnabled = false;
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) {
+ public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
this.taskContext = context;
}
@@ -58,9 +58,9 @@ public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFun
}
@Override
- public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+ public Class<GroupCombineFunction<IN, OUT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+ final Class<GroupCombineFunction<IN, OUT>> clazz = (Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class;
return clazz;
}
@@ -95,7 +95,7 @@ public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFun
TypeSerializer<IN> serializer = serializerFactory.getSerializer();
final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
- final FlatCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
+ final GroupCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
final Collector<OUT> output = this.taskContext.getOutputCollector();
if (objectReuseEnabled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index ad1afdb..a20fddf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -92,8 +92,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
switch (this.strategy) {
case ALL_GROUP_REDUCE_COMBINE:
- if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) {
- throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName());
+ if (!(this.taskContext.getStub() instanceof GroupCombineFunction)) {
+ throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GroupCombineFunction.class.getName());
}
case ALL_GROUP_REDUCE:
case ALL_GROUP_COMBINE:
@@ -129,7 +129,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
final Collector<OT> output = this.taskContext.getOutputCollector();
reducer.reduce(inIter, output);
} else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) {
- @SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub();
+ @SuppressWarnings("unchecked") final GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
combiner.combine(inIter, output);
} else {
@@ -147,7 +147,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
final Collector<OT> output = this.taskContext.getOutputCollector();
reducer.reduce(inIter, output);
} else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) {
- @SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub();
+ @SuppressWarnings("unchecked") final GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
combiner.combine(inIter, output);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index dacd436..493eb4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -45,7 +45,7 @@ import java.util.List;
* the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a
* lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution.
* In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result.
- * The CombineGroup uses the FlatCombineFunction interface which allows to combine values of type <IN> to any type
+ * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type
* of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and
* output type to be able to reduce the elements after the combine from <IN> to <OUT>.
*
@@ -54,18 +54,18 @@ import java.util.List;
* @param <IN> The data type consumed by the combiner.
* @param <OUT> The data type produced by the combiner.
*/
-public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class);
/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
- private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+ private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
private InMemorySorter<IN> sorter;
- private FlatCombineFunction<IN, OUT> combiner;
+ private GroupCombineFunction<IN, OUT> combiner;
private TypeSerializer<IN> serializer;
@@ -86,7 +86,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) {
+ public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
this.taskContext = context;
this.running = true;
}
@@ -97,9 +97,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
}
@Override
- public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+ public Class<GroupCombineFunction<IN, OUT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+ final Class<GroupCombineFunction<IN, OUT>> clazz = (Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class;
return clazz;
}
@@ -188,7 +188,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
final ReusingKeyGroupedIterator<IN> keyIter =
new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
- final FlatCombineFunction<IN, OUT> combiner = this.combiner;
+ final GroupCombineFunction<IN, OUT> combiner = this.combiner;
final Collector<OUT> output = this.output;
// iterate over key groups
@@ -203,7 +203,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine
final NonReusingKeyGroupedIterator<IN> keyIter =
new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
- final FlatCombineFunction<IN, OUT> combiner = this.combiner;
+ final GroupCombineFunction<IN, OUT> combiner = this.combiner;
final Collector<OUT> output = this.output;
// iterate over key groups
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index ca110c2..71b8afc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -525,7 +525,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
try {
FunctionUtils.closeFunction(this.stub);
}
- catch (Throwable t) {}
+ catch (Throwable t) {
+ // do nothing
+ }
}
// if resettable driver invoke teardown
@@ -1006,13 +1008,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
}
- if (!(localStub instanceof FlatCombineFunction)) {
+ if (!(localStub instanceof GroupCombineFunction)) {
throw new IllegalStateException("Performing combining sort outside a reduce task!");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
- (FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+ (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
this.config.getSpillingThresholdInput(inputNum));
@@ -1467,7 +1469,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
for (int i = 0; i < tasks.size(); i++) {
try {
tasks.get(i).cancelTask();
- } catch (Throwable t) {}
+ } catch (Throwable t) {
+ // do nothing
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 7e36b49..4116145 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.chaining;
import java.io.IOException;
import java.util.List;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -67,7 +67,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
private InMemorySorter<IN> sorter;
- private FlatCombineFunction<IN, OUT> combiner;
+ private GroupCombineFunction<IN, OUT> combiner;
private TypeSerializer<IN> serializer;
@@ -90,8 +90,8 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
this.parent = parent;
@SuppressWarnings("unchecked")
- final FlatCombineFunction<IN, OUT> combiner =
- RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class);
+ final GroupCombineFunction<IN, OUT> combiner =
+ RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class);
this.combiner = combiner;
FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
}
@@ -210,7 +210,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
// cache references on the stack
- final FlatCombineFunction<IN, OUT> stub = this.combiner;
+ final GroupCombineFunction<IN, OUT> stub = this.combiner;
final Collector<OUT> output = this.outputCollector;
// run stub implementation
@@ -226,7 +226,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
// cache references on the stack
- final FlatCombineFunction<IN, OUT> stub = this.combiner;
+ final GroupCombineFunction<IN, OUT> stub = this.combiner;
final Collector<OUT> output = this.outputCollector;
// run stub implementation
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 9282fd4..8da9413 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.operators.sort;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +70,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
*/
private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
- private final FlatCombineFunction<E, E> combineStub; // the user code stub that does the combining
+ private final GroupCombineFunction<E, E> combineStub; // the user code stub that does the combining
private Configuration udfConfig;
@@ -100,7 +100,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
* @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
* perform the sort.
*/
- public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+ public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
@@ -132,7 +132,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
* @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
* perform the sort.
*/
- public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+ public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
@@ -189,7 +189,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// ------------------- In-Memory Cache ------------------------
final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
- CircularElement<E> element = null;
+ CircularElement<E> element;
boolean cacheOnly = false;
// fill cache
@@ -253,7 +253,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// ------------------- Spilling Phase ------------------------
- final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+ final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
// now that we are actually spilling, take the combiner, and open it
try {
@@ -463,7 +463,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
this.memManager.getPageSize());
final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
- final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+ final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
// combine and write to disk
try {
@@ -573,7 +573,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
throw new TraversableOnceException();
}
}
- };
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 4b92b71..5c3ecf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,11 +18,6 @@
package org.apache.flink.runtime.blob;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.io.File;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
@@ -33,6 +28,11 @@ import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
/**
* This class contains unit tests for the {@link BlobCache}.
*/
@@ -70,8 +70,8 @@ public class BlobCacheSuccessTest {
blobCache = new BlobCache(serverAddress, new Configuration());
- for(int i = 0; i < blobKeys.size(); i++){
- blobCache.getURL(blobKeys.get(i));
+ for (BlobKey blobKey : blobKeys) {
+ blobCache.getURL(blobKey);
}
// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
@@ -87,9 +87,7 @@ public class BlobCacheSuccessTest {
// Verify the result
assertEquals(blobKeys.size(), urls.length);
- for (int i = 0; i < urls.length; ++i) {
-
- final URL url = urls[i];
+ for (final URL url : urls) {
assertNotNull(url);
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 1291181..2732112 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -641,7 +641,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* arbitrary output type.
*/
def combineGroup[R: TypeInformation: ClassTag](
- combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+ combiner: GroupCombineFunction[T, R]): DataSet[R] = {
if (combiner == null) {
throw new NullPointerException("Combine function must not be null.")
}
@@ -670,7 +670,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
if (fun == null) {
throw new NullPointerException("Combine function must not be null.")
}
- val combiner = new FlatCombineFunction[T, R] {
+ val combiner = new GroupCombineFunction[T, R] {
val cleanFun = clean(fun)
def combine(in: java.lang.Iterable[T], out: Collector[R]) {
cleanFun(in.iterator().asScala, out)
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index eca4563..d547ea4 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.functions.{KeySelector, FirstReducer}
import org.apache.flink.api.scala.operators.ScalaAggregateOperator
import scala.collection.JavaConverters._
import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.functions.{FlatCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner}
+import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.java.operators._
@@ -370,7 +370,7 @@ class GroupedDataSet[T: ClassTag](
def combineGroup[R: TypeInformation: ClassTag](
fun: (Iterator[T], Collector[R]) => Unit): DataSet[R] = {
Validate.notNull(fun, "GroupCombine function must not be null.")
- val combiner = new FlatCombineFunction[T, R] {
+ val combiner = new GroupCombineFunction[T, R] {
val cleanFun = set.clean(fun)
def combine(in: java.lang.Iterable[T], out: Collector[R]) {
cleanFun(in.iterator().asScala, out)
@@ -396,7 +396,7 @@ class GroupedDataSet[T: ClassTag](
* arbitrary output type.
*/
def combineGroup[R: TypeInformation: ClassTag](
- combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+ combiner: GroupCombineFunction[T, R]): DataSet[R] = {
Validate.notNull(combiner, "GroupCombine function must not be null.")
wrap(
new GroupCombineOperator[T, R](maybeCreateSortedGrouping(),
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 6631f07..e2a160d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -21,14 +21,14 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
import org.apache.flink.util.Collector;
public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
- FlatCombineFunction<VertexWithRank, VertexWithRank>
+ GroupCombineFunction<VertexWithRank, VertexWithRank>
{
private static final long serialVersionUID = 1L;