You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/08/15 18:07:31 UTC

[flink] branch release-1.5 updated: [FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input

This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 9f1c12c  [FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input
9f1c12c is described below

commit 9f1c12c10c3eb7b302f0688ed1b60fd08942dc03
Author: Xingcan Cui <xi...@gmail.com>
AuthorDate: Sun May 13 20:20:36 2018 +0800

    [FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input
    
    This closes #6003.
---
 .../flink/api/java/operators/KeyFunctions.java     |  27 +++
 .../flink/api/java/operators/UnionOperator.java    |   7 +
 .../translation/UnionTranslationTest.java          | 182 +++++++++++++++++++++
 .../flink/graph/library/linkanalysis/PageRank.java |   1 -
 .../apache/flink/python/api/PythonPlanBinder.java  |   2 +-
 5 files changed, 217 insertions(+), 2 deletions(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cd..3e7a552 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@ public class KeyFunctions {
 			org.apache.flink.api.common.operators.Operator<T> input,
 			SelectorFunctionKeys<T, K> key) {
 
+		if (input instanceof Union) {
+			// if input is a union, we apply the key extractors recursively to all inputs
+			org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput();
+			org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput();
+
+			org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+					appendKeyExtractor(firstInput, key);
+			org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey =
+					appendKeyExtractor(secondInput, key);
+
+			return new Union(firstInputWithKey, secondInputWithKey, input.getName());
+		}
+
 		TypeInformation<T> inputType = key.getInputType();
 		TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
 		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@ public class KeyFunctions {
 			SelectorFunctionKeys<T, K1> key1,
 			SelectorFunctionKeys<T, K2> key2) {
 
+		if (input instanceof Union) {
+			// if input is a union, we apply the key extractors recursively to all inputs
+			org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput();
+			org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput();
+
+			org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> firstInputWithKey =
+					appendKeyExtractor(firstInput, key1, key2);
+			org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> secondInputWithKey =
+					appendKeyExtractor(secondInput, key1, key2);
+
+			return new Union(firstInputWithKey, secondInputWithKey, input.getName());
+		}
+
 		TypeInformation<T> inputType = key1.getInputType();
 		TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
 		TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01..7d3c0d6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>
 	protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) {
 		return new Union<T>(input1, input2, unionLocationName);
 	}
+
+	@Override
+	public UnionOperator<T> setParallelism(int parallelism) {
+		// Union is not translated to an independent operator but executed by multiplexing
+		// its input on the following operator. Hence, the parallelism of a Union cannot be set.
+		throw new UnsupportedOperationException("Cannot set the parallelism for Union.");
+	}
 }
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 0000000..d393109
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+	@Test
+	public void translateUnion2Group() {
+		try {
+			final int parallelism = 4;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);
+
+			dataset1.union(dataset2)
+					.groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+						@Override
+						public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+							return "";
+						}
+					})
+					.reduceGroup(new GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() {
+						@Override
+						public void reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, Collector<String> out) throws Exception {
+						}
+					})
+					.returns(String.class)
+					.output(new DiscardingOutputFormat<>());
+
+			Plan p = env.createProgramPlan();
+
+			// The plan should look like the following one.
+			//
+			// DataSet1(3) - MapOperator(3)-+
+			//	                            |- Union(-1) - SingleInputOperator - Sink
+			// DataSet2(2) - MapOperator(2)-+
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+			Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
+
+			// The key mappers should be added to both of the two input streams for union.
+			assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
+			assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+			// The parallelisms of the key mappers should be equal to those of their inputs.
+			assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+			assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+			// The union should always have the default parallelism.
+			assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateUnion3SortedGroup() {
+		try {
+			final int parallelism = 4;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);
+
+			dataset1.union(dataset2).union(dataset3)
+					.groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+						@Override
+						public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+							return "";
+						}
+					})
+					.sortGroup(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+						@Override
+						public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+							return "";
+						}
+					}, Order.ASCENDING)
+					.reduceGroup(new GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() {
+						@Override
+						public void reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, Collector<String> out) throws Exception {
+						}
+					})
+					.returns(String.class)
+					.output(new DiscardingOutputFormat<>());
+
+			Plan p = env.createProgramPlan();
+
+			// The plan should look like the following one.
+			//
+			// DataSet1(2) - MapOperator(2)-+
+			//	                            |- Union(-1) -+
+			// DataSet2(3) - MapOperator(3)-+             |- Union(-1) - SingleInputOperator - Sink
+			//                                            |
+			//             DataSet3(-1) - MapOperator(-1)-+
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+			Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
+
+			// The first input of the second union should be the first union.
+			Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();
+
+			// The key mapper should be added to the second input stream of the second union.
+			assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+			// The key mappers should be added to both of the two input streams for the first union.
+			assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
+			assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+			// The parallelisms of the key mappers should be equal to those of their inputs.
+			assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+			assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+			assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+			// The union should always have the default parallelism.
+			assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+			assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+		return env
+				.fromElements(new Tuple3<>(0.0, new StringValue(""), new LongValue(1L)))
+				.setParallelism(parallelism);
+	}
+}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac..932ad78 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// s, adjusted pagerank(s)
 		DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
 			.union(sourceVertices)
-				.setParallelism(parallelism)
 				.name("Union with source vertices")
 			.map(new AdjustScores<>(dampingFactor))
 				.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 106ac7a..44c79de 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -530,7 +530,7 @@ public class PythonPlanBinder {
 	private <IN> void createUnionOperation(PythonOperationInfo info) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		DataSet<IN> op2 = sets.getDataSet(info.otherID);
-		sets.add(info.setID, op1.union(op2).setParallelism(info.parallelism).name("Union"));
+		sets.add(info.setID, op1.union(op2).name("Union"));
 	}
 
 	private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {