You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/08/15 18:07:31 UTC
[flink] branch release-1.5 updated: [FLINK-9289] [Dataset]
Parallelism of generated operators should have the max parallelism of input
This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new 9f1c12c [FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input
9f1c12c is described below
commit 9f1c12c10c3eb7b302f0688ed1b60fd08942dc03
Author: Xingcan Cui <xi...@gmail.com>
AuthorDate: Sun May 13 20:20:36 2018 +0800
[FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input
This closes #6003.
---
.../flink/api/java/operators/KeyFunctions.java | 27 +++
.../flink/api/java/operators/UnionOperator.java | 7 +
.../translation/UnionTranslationTest.java | 182 +++++++++++++++++++++
.../flink/graph/library/linkanalysis/PageRank.java | 1 -
.../apache/flink/python/api/PythonPlanBinder.java | 2 +-
5 files changed, 217 insertions(+), 2 deletions(-)
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cd..3e7a552 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@ public class KeyFunctions {
org.apache.flink.api.common.operators.Operator<T> input,
SelectorFunctionKeys<T, K> key) {
+ if (input instanceof Union) {
+ // if input is a union, we apply the key extractors recursively to all inputs
+ org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput();
+ org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput();
+
+ org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+ appendKeyExtractor(firstInput, key);
+ org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey =
+ appendKeyExtractor(secondInput, key);
+
+ return new Union(firstInputWithKey, secondInputWithKey, input.getName());
+ }
+
TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@ public class KeyFunctions {
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2) {
+ if (input instanceof Union) {
+ // if input is a union, we apply the key extractors recursively to all inputs
+ org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput();
+ org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput();
+
+ org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> firstInputWithKey =
+ appendKeyExtractor(firstInput, key1, key2);
+ org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> secondInputWithKey =
+ appendKeyExtractor(secondInput, key1, key2);
+
+ return new Union(firstInputWithKey, secondInputWithKey, input.getName());
+ }
+
TypeInformation<T> inputType = key1.getInputType();
TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01..7d3c0d6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>
protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) {
return new Union<T>(input1, input2, unionLocationName);
}
+
+ @Override
+ public UnionOperator<T> setParallelism(int parallelism) {
+ // Union is not translated to an independent operator but executed by multiplexing
+ // its input on the following operator. Hence, the parallelism of a Union cannot be set.
+ throw new UnsupportedOperationException("Cannot set the parallelism for Union.");
+ }
}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 0000000..d393109
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+ @Test
+ public void translateUnion2Group() {
+ try {
+ final int parallelism = 4;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);
+
+ dataset1.union(dataset2)
+ .groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+ @Override
+ public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+ return "";
+ }
+ })
+ .reduceGroup(new GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() {
+ @Override
+ public void reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, Collector<String> out) throws Exception {
+ }
+ })
+ .returns(String.class)
+ .output(new DiscardingOutputFormat<>());
+
+ Plan p = env.createProgramPlan();
+
+ // The plan should look like the following one.
+ //
+ // DataSet1(3) - MapOperator(3)-+
+ // |- Union(-1) - SingleInputOperator - Sink
+ // DataSet2(2) - MapOperator(2)-+
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+ Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
+
+ // The key mappers should be added to both of the two input streams for union.
+ assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
+ assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+ // The parallelisms of the key mappers should be equal to those of their inputs.
+ assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+ assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+ // The union should always have the default parallelism.
+ assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateUnion3SortedGroup() {
+ try {
+ final int parallelism = 4;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);
+
+ dataset1.union(dataset2).union(dataset3)
+ .groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+ @Override
+ public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+ return "";
+ }
+ })
+ .sortGroup(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+ @Override
+ public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+ return "";
+ }
+ }, Order.ASCENDING)
+ .reduceGroup(new GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() {
+ @Override
+ public void reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, Collector<String> out) throws Exception {
+ }
+ })
+ .returns(String.class)
+ .output(new DiscardingOutputFormat<>());
+
+ Plan p = env.createProgramPlan();
+
+ // The plan should look like the following one.
+ //
+ // DataSet1(2) - MapOperator(2)-+
+ // |- Union(-1) -+
+ // DataSet2(3) - MapOperator(3)-+ |- Union(-1) - SingleInputOperator - Sink
+ // |
+ // DataSet3(-1) - MapOperator(-1)-+
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+ Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
+
+ // The first input of the second union should be the first union.
+ Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();
+
+ // The key mapper should be added to the second input stream of the second union.
+ assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+ // The key mappers should be added to both of the two input streams for the first union.
+ assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
+ assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+ // The parallelisms of the key mappers should be equal to those of their inputs.
+ assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+ assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+ assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+ // The union should always have the default parallelism.
+ assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+ assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+ return env
+ .fromElements(new Tuple3<>(0.0, new StringValue(""), new LongValue(1L)))
+ .setParallelism(parallelism);
+ }
+}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac..932ad78 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// s, adjusted pagerank(s)
DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
.union(sourceVertices)
- .setParallelism(parallelism)
.name("Union with source vertices")
.map(new AdjustScores<>(dampingFactor))
.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 106ac7a..44c79de 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -530,7 +530,7 @@ public class PythonPlanBinder {
private <IN> void createUnionOperation(PythonOperationInfo info) {
DataSet<IN> op1 = sets.getDataSet(info.parentID);
DataSet<IN> op2 = sets.getDataSet(info.otherID);
- sets.add(info.setID, op1.union(op2).setParallelism(info.parallelism).name("Union"));
+ sets.add(info.setID, op1.union(op2).name("Union"));
}
private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {