You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/15 16:37:06 UTC

[GitHub] asfgit closed pull request #6003: [FLINK-9289][Dataset] Parallelism of generated operators should have max parallelism of input

asfgit closed pull request #6003: [FLINK-9289][Dataset] Parallelism of generated operators should have max parallelism of input
URL: https://github.com/apache/flink/pull/6003
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cde2b7..3e7a552a68f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@
 			org.apache.flink.api.common.operators.Operator<T> input,
 			SelectorFunctionKeys<T, K> key) {
 
+		if (input instanceof Union) {
+			// if input is a union, we apply the key extractors recursively to all inputs
+			org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput();
+			org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput();
+
+			org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+					appendKeyExtractor(firstInput, key);
+			org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey =
+					appendKeyExtractor(secondInput, key);
+
+			return new Union(firstInputWithKey, secondInputWithKey, input.getName());
+		}
+
 		TypeInformation<T> inputType = key.getInputType();
 		TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
 		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@
 			SelectorFunctionKeys<T, K1> key1,
 			SelectorFunctionKeys<T, K2> key2) {
 
+		if (input instanceof Union) {
+			// if input is a union, we apply the key extractors recursively to all inputs
+			org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput();
+			org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput();
+
+			org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> firstInputWithKey =
+					appendKeyExtractor(firstInput, key1, key2);
+			org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> secondInputWithKey =
+					appendKeyExtractor(secondInput, key1, key2);
+
+			return new Union(firstInputWithKey, secondInputWithKey, input.getName());
+		}
+
 		TypeInformation<T> inputType = key1.getInputType();
 		TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
 		TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01a0b3..7d3c0d6fc3f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public UnionOperator(DataSet<T> input1, DataSet<T> input2, String unionLocationN
 	protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) {
 		return new Union<T>(input1, input2, unionLocationName);
 	}
+
+	@Override
+	public UnionOperator<T> setParallelism(int parallelism) {
+		// Union is not translated to an independent operator but executed by multiplexing
+		// its input on the following operator. Hence, the parallelism of a Union cannot be set.
+		throw new UnsupportedOperationException("Cannot set the parallelism for Union.");
+	}
 }
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 00000000000..216e37f2acf
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+	@Test
+	public void translateUnion2Group() {
+		try {
+			final int parallelism = 4;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2);
+
+			dataset1.union(dataset2)
+					.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
+					.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
+					.returns(String.class)
+					.output(new DiscardingOutputFormat<>());
+
+			Plan p = env.createProgramPlan();
+
+			// The plan should look like the following one.
+			//
+			// DataSet1(3) - MapOperator(3)-+
+			//	                            |- Union(-1) - SingleInputOperator - Sink
+			// DataSet2(2) - MapOperator(2)-+
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+			Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
+
+			// The key mappers should be added to both of the two input streams for union.
+			assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
+			assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+			// The parallelisms of the key mappers should be equal to those of their inputs.
+			assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+			assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+			// The union should always have the default parallelism.
+			assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateUnion3SortedGroup() {
+		try {
+			final int parallelism = 4;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1);
+
+			dataset1.union(dataset2).union(dataset3)
+					.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
+					.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
+					.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
+					.returns(String.class)
+					.output(new DiscardingOutputFormat<>());
+
+			Plan p = env.createProgramPlan();
+
+			// The plan should look like the following one.
+			//
+			// DataSet1(2) - MapOperator(2)-+
+			//	                            |- Union(-1) -+
+			// DataSet2(3) - MapOperator(3)-+             |- Union(-1) - SingleInputOperator - Sink
+			//                                            |
+			//             DataSet3(-1) - MapOperator(-1)-+
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+			Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput();
+
+			// The first input of the second union should be the first union.
+			Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput();
+
+			// The key mapper should be added to the second input stream of the second union.
+			assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+			// The key mappers should be added to both of the two input streams for the first union.
+			assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>);
+			assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>);
+
+			// The parallelisms of the key mappers should be equal to those of their inputs.
+			assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+			assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+			assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+			// The union should always have the default parallelism.
+			assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+			assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+		return env
+				.fromElements(new Tuple3<>(0.0, new StringValue(""), new LongValue(1L)))
+				.setParallelism(parallelism);
+	}
+}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac6341..932ad78a1d8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		// s, adjusted pagerank(s)
 		DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
 			.union(sourceVertices)
-				.setParallelism(parallelism)
 				.name("Union with source vertices")
 			.map(new AdjustScores<>(dampingFactor))
 				.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1182708bb9f..4709fa52a5a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -515,7 +515,7 @@ private void createSortOperation(PythonOperationInfo info) {
 	private <IN> void createUnionOperation(PythonOperationInfo info) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		DataSet<IN> op2 = sets.getDataSet(info.otherID);
-		sets.add(info.setID, op1.union(op2).setParallelism(info.parallelism).name("Union"));
+		sets.add(info.setID, op1.union(op2).name("Union"));
 	}
 
 	private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services