You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 07:54:49 UTC
[flink] branch master updated: [FLINK-13494][table-planner-blink]
Remove source and sink parallelism configurations
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8266707 [FLINK-13494][table-planner-blink] Remove source and sink parallelism configurations
8266707 is described below
commit 8266707502889e279256d09645376c665ab0cdcb
Author: Xupingyong <xu...@163.com>
AuthorDate: Wed Jul 31 13:56:31 2019 +0800
[FLINK-13494][table-planner-blink] Remove source and sink parallelism configurations
This closes #9277
---
.../table/api/config/ExecutionConfigOptions.java | 19 +-
.../planner/plan/nodes/resource/NodeResource.java | 57 ----
.../plan/nodes/resource/NodeResourceUtil.java | 38 +--
.../parallelism/FinalParallelismSetter.java | 135 ---------
.../resource/parallelism/ParallelismProcessor.java | 73 -----
.../nodes/resource/parallelism/ShuffleStage.java | 88 ------
.../parallelism/ShuffleStageGenerator.java | 157 ----------
.../ShuffleStageParallelismCalculator.java | 92 ------
.../table/planner/delegation/BatchPlanner.scala | 6 +-
.../table/planner/delegation/PlannerBase.scala | 10 +
.../table/planner/delegation/StreamPlanner.scala | 11 +-
.../table/planner/plan/nodes/exec/ExecNode.scala | 26 +-
.../batch/BatchExecBoundedStreamScan.scala | 8 +-
.../plan/nodes/physical/batch/BatchExecCalc.scala | 2 +-
.../nodes/physical/batch/BatchExecCorrelate.scala | 2 +-
.../nodes/physical/batch/BatchExecExchange.scala | 6 +
.../nodes/physical/batch/BatchExecExpand.scala | 2 +-
.../batch/BatchExecHashAggregateBase.scala | 2 +-
.../nodes/physical/batch/BatchExecHashJoin.scala | 2 +-
.../batch/BatchExecHashWindowAggregateBase.scala | 2 +-
.../plan/nodes/physical/batch/BatchExecLimit.scala | 2 +-
.../nodes/physical/batch/BatchExecLookupJoin.scala | 4 +-
.../physical/batch/BatchExecNestedLoopJoin.scala | 3 +-
.../physical/batch/BatchExecOverAggregate.scala | 2 +-
.../plan/nodes/physical/batch/BatchExecRank.scala | 2 +-
.../plan/nodes/physical/batch/BatchExecSink.scala | 19 +-
.../plan/nodes/physical/batch/BatchExecSort.scala | 2 +-
.../batch/BatchExecSortAggregateBase.scala | 2 +-
.../nodes/physical/batch/BatchExecSortLimit.scala | 2 +-
.../physical/batch/BatchExecSortMergeJoin.scala | 2 +-
.../batch/BatchExecSortWindowAggregateBase.scala | 2 +-
.../physical/batch/BatchExecTableSourceScan.scala | 5 +-
.../nodes/physical/batch/BatchExecValues.scala | 2 +-
.../nodes/physical/stream/StreamExecCalc.scala | 7 +-
.../physical/stream/StreamExecCorrelate.scala | 7 +-
.../physical/stream/StreamExecDataStreamScan.scala | 8 +-
.../physical/stream/StreamExecDeduplicate.scala | 7 +-
.../nodes/physical/stream/StreamExecExchange.scala | 3 +
.../nodes/physical/stream/StreamExecExpand.scala | 7 +-
.../stream/StreamExecGlobalGroupAggregate.scala | 7 +-
.../physical/stream/StreamExecGroupAggregate.scala | 7 +-
.../stream/StreamExecGroupWindowAggregate.scala | 7 +-
.../StreamExecIncrementalGroupAggregate.scala | 7 +-
.../nodes/physical/stream/StreamExecJoin.scala | 7 +-
.../nodes/physical/stream/StreamExecLimit.scala | 7 +-
.../stream/StreamExecLocalGroupAggregate.scala | 7 +-
.../physical/stream/StreamExecLookupJoin.scala | 6 +-
.../nodes/physical/stream/StreamExecMatch.scala | 14 +-
.../physical/stream/StreamExecOverAggregate.scala | 7 +-
.../nodes/physical/stream/StreamExecRank.scala | 7 +-
.../nodes/physical/stream/StreamExecSink.scala | 20 --
.../nodes/physical/stream/StreamExecSort.scala | 7 +-
.../physical/stream/StreamExecSortLimit.scala | 7 +-
.../stream/StreamExecTableSourceScan.scala | 3 -
.../physical/stream/StreamExecTemporalJoin.scala | 7 +-
.../physical/stream/StreamExecTemporalSort.scala | 9 +-
.../nodes/physical/stream/StreamExecValues.scala | 6 +-
.../physical/stream/StreamExecWindowJoin.scala | 7 +-
.../planner/plan/utils/ExecNodePlanDumper.scala | 4 -
.../plan/nodes/resource/MockNodeTestBase.java | 185 ------------
.../parallelism/FinalParallelismSetterTest.java | 118 --------
.../parallelism/ShuffleStageGeneratorTest.java | 323 ---------------------
.../ShuffleStageParallelismCalculatorTest.java | 123 --------
.../resources/explain/testGetStatsFromCatalog.out | 14 +-
.../plan/nodes/resource/ExecNodeResourceTest.xml | 153 ----------
.../apache/flink/table/api/batch/ExplainTest.scala | 8 +-
.../flink/table/api/stream/ExplainTest.scala | 8 +-
.../plan/nodes/resource/ExecNodeResourceTest.scala | 251 ----------------
68 files changed, 174 insertions(+), 1988 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 4abbb38..1c6c8ab 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -83,18 +83,13 @@ public class ExecutionConfigOptions {
public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM =
key("table.exec.resource.default-parallelism")
.defaultValue(-1)
- .withDescription("Default parallelism of job operators. If it is <= 0, use parallelism of StreamExecutionEnvironment(" +
- "its default value is the num of cpu cores in the client host).");
-
- public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM =
- key("table.exec.resource.source.parallelism")
- .defaultValue(-1)
- .withDescription("Sets source parallelism, if it is <= 0, use " + TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key() + " to set source parallelism.");
-
- public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_SINK_PARALLELISM =
- key("table.exec.resource.sink.parallelism")
- .defaultValue(-1)
- .withDescription("Sets sink parallelism, if it is <= 0, use " + TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key() + " to set sink parallelism.");
+ .withDescription("Sets default parallelism for all operators " +
+ "(such as aggregate, join, filter) to run with parallel instances. " +
+ "This config has a higher priority than parallelism of " +
+ "StreamExecutionEnvironment (actually, this config overrides the parallelism " +
+ "of StreamExecutionEnvironment). A value of -1 indicates that no " +
+ "default parallelism is set, then it will fallback to use the parallelism " +
+ "of StreamExecutionEnvironment.");
public static final ConfigOption<String> TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY =
key("table.exec.resource.external-buffer-memory")
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResource.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResource.java
deleted file mode 100644
index 9b1826f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource;
-
-/**
- * Resource for node: parallelism. And other resource latitudes needed to add.
- */
-public class NodeResource {
-
- // node parallelism
- private int parallelism = -1;
-
- private int maxParallelism = -1;
-
- public int getParallelism() {
- return parallelism;
- }
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-
- public int getMaxParallelism() {
- return maxParallelism;
- }
-
- public void setMaxParallelism(int maxParallelism) {
- this.maxParallelism = maxParallelism;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("{");
- sb.append("parallelism=").append(parallelism);
- if (maxParallelism > 0) {
- sb.append(", maxParallelism=").append(maxParallelism);
- }
- sb.append("}");
- return sb.toString();
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java
index 1d936b6..b5e1925 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java
@@ -19,8 +19,6 @@
package org.apache.flink.table.planner.plan.nodes.resource;
import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
/**
* Deal with resource config for {@link org.apache.flink.table.planner.plan.nodes.exec.ExecNode}.
@@ -33,42 +31,8 @@ public class NodeResourceUtil {
public static final long SIZE_IN_MB = 1024L * 1024;
/**
- * Gets the config parallelism for source.
- * @param tableConf Configuration.
- * @return the config parallelism for source.
+ * Build resourceSpec from managedMem.
*/
- public static int getSourceParallelism(Configuration tableConf, int envParallelism) {
- int parallelism = tableConf.getInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM);
- if (parallelism <= 0) {
- parallelism = getOperatorDefaultParallelism(tableConf, envParallelism);
- }
- return parallelism;
- }
-
- /**
- * Gets the config parallelism for sink. If it is not set, return -1.
- * @param tableConf Configuration.
- * @return the config parallelism for sink.
- */
- public static int getSinkParallelism(Configuration tableConf) {
- return tableConf.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM);
- }
-
- /**
- * Gets default parallelism of operator.
- * @param tableConf Configuration.
- * @return default parallelism of operator.
- */
- public static int getOperatorDefaultParallelism(Configuration tableConf, int envParallelism) {
- int parallelism = tableConf.getInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
- if (parallelism <= 0) {
- parallelism = envParallelism;
- }
- return parallelism;
- }
-
public static ResourceSpec fromManagedMem(int managedMem) {
ResourceSpec.Builder builder = ResourceSpec.newBuilder();
builder.setManagedMemoryInMB(managedMem);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetter.java
deleted file mode 100644
index 1f45bec..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetter.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.Values;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Set final parallelism if needed at the beginning time, if parallelism of a node is set to be final,
- * it will not be changed by other parallelism calculator.
- */
-public class FinalParallelismSetter {
-
- private final StreamExecutionEnvironment env;
- private Set<ExecNode<?, ?>> calculatedNodeSet = new HashSet<>();
- private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = new HashMap<>();
-
- private FinalParallelismSetter(StreamExecutionEnvironment env) {
- this.env = env;
- }
-
- /**
- * Finding nodes that need to set final parallelism.
- */
- public static Map<ExecNode<?, ?>, Integer> calculate(StreamExecutionEnvironment env, List<ExecNode<?, ?>> sinkNodes) {
- FinalParallelismSetter setter = new FinalParallelismSetter(env);
- sinkNodes.forEach(setter::calculate);
- return setter.finalParallelismNodeMap;
- }
-
- private void calculate(ExecNode<?, ?> execNode) {
- if (!calculatedNodeSet.add(execNode)) {
- return;
- }
- if (execNode instanceof BatchExecTableSourceScan) {
- calculateTableSource((BatchExecTableSourceScan) execNode);
- } else if (execNode instanceof StreamExecTableSourceScan) {
- calculateTableSource((StreamExecTableSourceScan) execNode);
- } else if (execNode instanceof BatchExecBoundedStreamScan) {
- calculateBoundedStreamScan((BatchExecBoundedStreamScan) execNode);
- } else if (execNode instanceof StreamExecDataStreamScan) {
- calculateDataStreamScan((StreamExecDataStreamScan) execNode);
- } else if (execNode instanceof Values) {
- calculateValues(execNode);
- } else {
- calculateIfSingleton(execNode);
- }
- }
-
- private void calculateTableSource(BatchExecTableSourceScan tableSourceScan) {
- Transformation transformation = tableSourceScan.getSourceTransformation(env);
- if (transformation.getMaxParallelism() > 0) {
- tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism());
- }
- }
-
- private void calculateTableSource(StreamExecTableSourceScan tableSourceScan) {
- Transformation transformation = tableSourceScan.getSourceTransformation(env);
- if (transformation.getMaxParallelism() > 0) {
- tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism());
- }
- }
-
- private void calculateBoundedStreamScan(BatchExecBoundedStreamScan boundedStreamScan) {
- Transformation transformation = boundedStreamScan.getSourceTransformation();
- int parallelism = transformation.getParallelism();
- if (parallelism <= 0) {
- parallelism = env.getParallelism();
- }
- finalParallelismNodeMap.put(boundedStreamScan, parallelism);
- }
-
- private void calculateDataStreamScan(StreamExecDataStreamScan dataStreamScan) {
- Transformation transformation = dataStreamScan.getSourceTransformation();
- int parallelism = transformation.getParallelism();
- if (parallelism <= 0) {
- parallelism = env.getParallelism();
- }
- finalParallelismNodeMap.put(dataStreamScan, parallelism);
- }
-
- private void calculateIfSingleton(ExecNode<?, ?> execNode) {
- calculateInputs(execNode);
- for (ExecNode<?, ?> inputNode : execNode.getInputNodes()) {
- if (inputNode instanceof Exchange &&
- ((Exchange) inputNode).getDistribution().getType() == RelDistribution.Type.SINGLETON) {
- // set parallelism as 1 to GlobalAggregate and other global node.
- finalParallelismNodeMap.put(execNode, 1);
- execNode.getResource().setMaxParallelism(1);
- return;
- }
- }
- }
-
- private void calculateValues(ExecNode<?, ?> values) {
- finalParallelismNodeMap.put(values, 1);
- values.getResource().setMaxParallelism(1);
- }
-
- private void calculateInputs(ExecNode<?, ?> node) {
- node.getInputNodes().forEach(this::calculate);
- }
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ParallelismProcessor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ParallelismProcessor.java
deleted file mode 100644
index ebd2275..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ParallelismProcessor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink;
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Processor for calculating parallelism for {@link ExecNode} dag.
- */
-public class ParallelismProcessor implements DAGProcessor {
-
- @Override
- public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
- PlannerBase planner = context.getPlanner();
- List<ExecNode<?, ?>> rootNodes = filterSinkNodes(sinkNodes);
- // find exec nodes whose parallelism cannot be changed.
- Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap =
- FinalParallelismSetter.calculate(planner.getExecEnv(), rootNodes);
- // generate shuffleStages that bind adjacent exec nodes together whose parallelism can be the same.
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
- ShuffleStageGenerator.generate(rootNodes, nodeToFinalParallelismMap);
- // calculate parallelism of shuffleStages.
- ShuffleStageParallelismCalculator.calculate(
- planner.getTableConfig().getConfiguration(), planner.getExecEnv().getParallelism(), nodeShuffleStageMap.values());
- for (ExecNode<?, ?> node : nodeShuffleStageMap.keySet()) {
- node.getResource().setParallelism(nodeShuffleStageMap.get(node).getParallelism());
- }
- return sinkNodes;
- }
-
- /**
- * Filter sink nodes because parallelism of sink nodes is calculated after translateToPlan, as
- * transformations generated by {@link BatchExecSink} or {@link StreamExecSink} have too many
- * uncertainty factors. Filtering here can let later process easier.
- */
- private List<ExecNode<?, ?>> filterSinkNodes(List<ExecNode<?, ?>> sinkNodes) {
- List<ExecNode<?, ?>> rootNodes = new ArrayList<>();
- sinkNodes.forEach(s -> {
- if (s instanceof Sink) {
- rootNodes.add(s.getInputNodes().get(0));
- } else {
- rootNodes.add(s);
- }
- });
- return rootNodes;
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStage.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStage.java
deleted file mode 100644
index b9b5bf5..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStage.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-/**
- * There are no shuffle when transferring data in a shuffleStage.
- */
-public class ShuffleStage {
-
- private final Set<ExecNode<?, ?>> execNodeSet = new LinkedHashSet<>();
-
- // parallelism of this shuffleStage.
- private int parallelism = -1;
-
- private int maxParallelism = Integer.MAX_VALUE;
-
- // whether this parallelism is final, if it is final, it can not be changed.
- private boolean isFinalParallelism = false;
-
- public void addNode(ExecNode<?, ?> node) {
- execNodeSet.add(node);
- if (node.getResource().getMaxParallelism() > 0 &&
- node.getResource().getMaxParallelism() < maxParallelism) {
- maxParallelism = node.getResource().getMaxParallelism();
- }
- }
-
- public int getMaxParallelism() {
- return maxParallelism;
- }
-
- public void addNodeSet(Set<ExecNode<?, ?>> nodeSet) {
- nodeSet.forEach(this::addNode);
- }
-
- public void removeNode(ExecNode<?, ?> node) {
- this.execNodeSet.remove(node);
- }
-
- public Set<ExecNode<?, ?>> getExecNodeSet() {
- return this.execNodeSet;
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public void setParallelism(int parallelism, boolean finalParallelism) {
- if (this.isFinalParallelism) {
- if (finalParallelism && this.parallelism != parallelism) {
- throw new IllegalArgumentException("both fixed parallelism are not equal, old: " + this.parallelism + ", new: " + parallelism);
- }
- } else {
- if (finalParallelism) {
- this.parallelism = parallelism;
- this.isFinalParallelism = true;
- } else {
- this.parallelism = Math.max(this.parallelism, parallelism);
- }
- }
- }
-
- public boolean isFinalParallelism() {
- return isFinalParallelism;
- }
-
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGenerator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGenerator.java
deleted file mode 100644
index 8de5876..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGenerator.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
-
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.core.Exchange;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * Build exec nodes to shuffleStages according to {@link BatchExecExchange}.
- * If there is data shuffle between two adjacent exec nodes,
- * they are belong to different shuffleStages.
- * If there is no data shuffle between two adjacent exec nodes, but
- * they have different final parallelism, they are also belong to different shuffleStages.
- */
-public class ShuffleStageGenerator {
-
- private final Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = new LinkedHashMap<>();
- private final Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap;
-
- private ShuffleStageGenerator(Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap) {
- this.nodeToFinalParallelismMap = nodeToFinalParallelismMap;
- }
-
- public static Map<ExecNode<?, ?>, ShuffleStage> generate(List<ExecNode<?, ?>> sinkNodes, Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap) {
- ShuffleStageGenerator generator = new ShuffleStageGenerator(finalParallelismNodeMap);
- sinkNodes.forEach(generator::buildShuffleStages);
- Map<ExecNode<?, ?>, ShuffleStage> result = generator.getNodeShuffleStageMap();
- result.values().forEach(s -> {
- List<ExecNode<?, ?>> virtualNodeList = s.getExecNodeSet().stream().filter(ShuffleStageGenerator::isVirtualNode).collect(toList());
- virtualNodeList.forEach(s::removeNode);
- });
- return generator.getNodeShuffleStageMap().entrySet().stream()
- .filter(x -> !isVirtualNode(x.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
- Map.Entry::getValue,
- (e1, e2) -> e1,
- LinkedHashMap::new));
- }
-
- private void buildShuffleStages(ExecNode<?, ?> execNode) {
- if (nodeShuffleStageMap.containsKey(execNode)) {
- return;
- }
- for (ExecNode<?, ?> input : execNode.getInputNodes()) {
- buildShuffleStages((input));
- }
-
- if (execNode.getInputNodes().isEmpty()) {
- // source node
- ShuffleStage shuffleStage = new ShuffleStage();
- shuffleStage.addNode(execNode);
- if (nodeToFinalParallelismMap.containsKey(execNode)) {
- shuffleStage.setParallelism(nodeToFinalParallelismMap.get(execNode), true);
- }
- nodeShuffleStageMap.put(execNode, shuffleStage);
- } else if (execNode instanceof Exchange && !isRangeExchange((Exchange) execNode)) {
- // do nothing.
- } else {
- Set<ShuffleStage> inputShuffleStages = getInputShuffleStages(execNode);
- Integer parallelism = nodeToFinalParallelismMap.get(execNode);
- ShuffleStage inputShuffleStage = mergeInputShuffleStages(inputShuffleStages, parallelism);
- inputShuffleStage.addNode(execNode);
- nodeShuffleStageMap.put(execNode, inputShuffleStage);
- }
- }
-
- private boolean isRangeExchange(Exchange exchange) {
- return exchange.getDistribution().getType() == RelDistribution.Type.RANGE_DISTRIBUTED;
- }
-
- private ShuffleStage mergeInputShuffleStages(Set<ShuffleStage> shuffleStageSet, Integer parallelism) {
- if (parallelism != null) {
- ShuffleStage resultShuffleStage = new ShuffleStage();
- resultShuffleStage.setParallelism(parallelism, true);
- for (ShuffleStage shuffleStage : shuffleStageSet) {
- //consider max parallelism.
- if ((shuffleStage.isFinalParallelism() && shuffleStage.getParallelism() == parallelism)
- || (!shuffleStage.isFinalParallelism() && shuffleStage.getMaxParallelism() >= parallelism)) {
- mergeShuffleStage(resultShuffleStage, shuffleStage);
- }
- }
- return resultShuffleStage;
- } else {
- ShuffleStage resultShuffleStage = shuffleStageSet.stream()
- .filter(ShuffleStage::isFinalParallelism)
- .max(Comparator.comparing(ShuffleStage::getParallelism))
- .orElse(new ShuffleStage());
- for (ShuffleStage shuffleStage : shuffleStageSet) {
- //consider max parallelism.
- if ((shuffleStage.isFinalParallelism() && shuffleStage.getParallelism() == resultShuffleStage.getParallelism())
- || (!shuffleStage.isFinalParallelism() && shuffleStage.getMaxParallelism() >= resultShuffleStage.getParallelism())) {
- mergeShuffleStage(resultShuffleStage, shuffleStage);
- }
- }
- return resultShuffleStage;
- }
- }
-
- private void mergeShuffleStage(ShuffleStage shuffleStage, ShuffleStage other) {
- Set<ExecNode<?, ?>> nodeSet = other.getExecNodeSet();
- shuffleStage.addNodeSet(nodeSet);
- for (ExecNode<?, ?> r : nodeSet) {
- nodeShuffleStageMap.put(r, shuffleStage);
- }
- }
-
- private Set<ShuffleStage> getInputShuffleStages(ExecNode<?, ?> node) {
- Set<ShuffleStage> shuffleStageList = new HashSet<>();
- for (ExecNode<?, ?> input : node.getInputNodes()) {
- ShuffleStage oneInputShuffleStage = nodeShuffleStageMap.get(input);
- if (oneInputShuffleStage != null) {
- shuffleStageList.add(oneInputShuffleStage);
- }
- }
- return shuffleStageList;
- }
-
- private static boolean isVirtualNode(ExecNode<?, ?> node) {
- return node instanceof BatchExecUnion || node instanceof StreamExecUnion;
- }
-
- private Map<ExecNode<?, ?>, ShuffleStage> getNodeShuffleStageMap() {
- return nodeShuffleStageMap;
- }
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
deleted file mode 100644
index 4d96e7e..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Parallelism calculator for shuffleStages.
- */
-public class ShuffleStageParallelismCalculator {
- private static final Logger LOG = LoggerFactory.getLogger(ShuffleStageParallelismCalculator.class);
- private final Configuration tableConf;
- private final int envParallelism;
-
- private ShuffleStageParallelismCalculator(Configuration tableConf, int envParallelism) {
- this.tableConf = tableConf;
- this.envParallelism = envParallelism;
- }
-
- public static void calculate(Configuration tableConf, int envParallelism, Collection<ShuffleStage> shuffleStages) {
- new ShuffleStageParallelismCalculator(tableConf, envParallelism).calculate(shuffleStages);
- }
-
- private void calculate(Collection<ShuffleStage> shuffleStages) {
- Set<ShuffleStage> shuffleStageSet = new HashSet<>(shuffleStages);
- shuffleStageSet.forEach(this::calculate);
- }
-
- /**
- * If there are source nodes in a shuffleStage, its parallelism is the max parallelism of source
- * nodes. Otherwise, its parallelism is the default operator parallelism.
- */
- @VisibleForTesting
- protected void calculate(ShuffleStage shuffleStage) {
- if (shuffleStage.isFinalParallelism()) {
- return;
- }
- Set<ExecNode<?, ?>> nodeSet = shuffleStage.getExecNodeSet();
- int sourceParallelism = -1;
- int maxParallelism = shuffleStage.getMaxParallelism();
- for (ExecNode<?, ?> node : nodeSet) {
- if (node instanceof BatchExecTableSourceScan || node instanceof StreamExecTableSourceScan) {
- int result = NodeResourceUtil.getSourceParallelism(tableConf, envParallelism);
- if (result > sourceParallelism) {
- sourceParallelism = result;
- }
- }
- }
- int shuffleStageParallelism;
- if (sourceParallelism > 0) {
- shuffleStageParallelism = sourceParallelism;
- } else {
- shuffleStageParallelism = NodeResourceUtil.getOperatorDefaultParallelism(getTableConf(), envParallelism);
- }
- if (shuffleStageParallelism > maxParallelism) {
- shuffleStageParallelism = maxParallelism;
- }
- shuffleStage.setParallelism(shuffleStageParallelism, false);
- }
-
- private Configuration getTableConf() {
- return this.tableConf;
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 8d43bc9..213be6a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOpera
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
-import org.apache.flink.table.planner.plan.nodes.resource.parallelism.ParallelismProcessor
import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor
import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
@@ -61,13 +60,12 @@ class BatchPlanner(
val execNodePlan = super.translateToExecNodePlan(optimizedRelNodes)
val context = new DAGProcessContext(this)
// breakup deadlock
- val postNodeDag = new DeadlockBreakupProcessor().process(execNodePlan, context)
- // set parallelism
- new ParallelismProcessor().process(postNodeDag, context)
+ new DeadlockBreakupProcessor().process(execNodePlan, context)
}
override protected def translateToPlan(
execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
+ overrideEnvParallelism()
execNodes.map {
case node: BatchExecNode[_] => node.translateToPlan(this)
case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 2b0b484..18922e8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.catalog.{CatalogManager, CatalogTable, ConnectorCatalogTable, FunctionCatalog, ObjectPath}
import org.apache.flink.table.delegation.{Executor, Planner}
@@ -148,6 +149,15 @@ abstract class PlannerBase(
translateToPlan(execNodes)
}
+ protected def overrideEnvParallelism(): Unit = {
+ // Use config parallelism to override env parallelism.
+ val defaultParallelism = getTableConfig.getConfiguration.getInteger(
+ ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
+ if (defaultParallelism > 0) {
+ getExecEnv.setParallelism(defaultParallelism)
+ }
+ }
+
override def getCompletionHints(statement: String, position: Int): Array[String] = {
val planner = getFlinkPlanner
planner.getCompletionHints(statement, position)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 6b01362..091083a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -25,14 +25,11 @@ import org.apache.flink.table.delegation.Executor
import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
-import org.apache.flink.table.planner.plan.nodes.resource.parallelism.ParallelismProcessor
import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
import org.apache.flink.table.planner.utils.PlanUtil
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
-import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql.SqlExplainLevel
import java.util
@@ -57,15 +54,9 @@ class StreamPlanner(
override protected def getOptimizer: Optimizer = new StreamCommonSubGraphBasedOptimizer(this)
- override private[flink] def translateToExecNodePlan(
- optimizedRelNodes: Seq[RelNode]): util.List[ExecNode[_, _]] = {
- val execNodePlan = super.translateToExecNodePlan(optimizedRelNodes)
- val context = new DAGProcessContext(this)
- new ParallelismProcessor().process(execNodePlan, context)
- }
-
override protected def translateToPlan(
execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
+ overrideEnvParallelism()
execNodes.map {
case node: StreamExecNode[_] => node.translateToPlan(this)
case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala
index 50b5035..edcca5b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala
@@ -21,10 +21,14 @@ package org.apache.flink.table.planner.plan.nodes.exec
import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.delegation.Planner
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResource
+
+import org.apache.calcite.rel.RelDistribution
+import org.apache.calcite.rel.core.Exchange
import java.util
+import scala.collection.JavaConversions._
+
/**
* The representation of execution information for a [[FlinkPhysicalRel]].
*
@@ -34,21 +38,11 @@ import java.util
trait ExecNode[E <: Planner, T] {
/**
- * Defines how much resource the node will take.
- */
- private val resource: NodeResource = new NodeResource
-
- /**
* The [[Transformation]] translated from this node.
*/
private var transformation: Transformation[T] = _
/**
- * Get node resource.
- */
- def getResource: NodeResource = resource
-
- /**
* Translates this node into a Flink operator.
*
* <p>NOTE: returns same translate result if called multiple times.
@@ -94,4 +88,14 @@ trait ExecNode[E <: Planner, T] {
def accept(visitor: ExecNodeVisitor): Unit = {
visitor.visit(this)
}
+
+ /**
+ * Whether there is singleton exchange node as input.
+ */
+ protected def inputsContainSingleton(): Boolean = {
+ getInputNodes.exists { node =>
+ node.isInstanceOf[Exchange] &&
+ node.asInstanceOf[Exchange].getDistribution.getType == RelDistribution.Type.SINGLETON
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index e7de711..5779d72 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -87,9 +87,8 @@ class BatchExecBoundedStreamScan(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
val batchTransform = boundedStreamTable.dataStream.getTransformation
- batchTransform.setParallelism(getResource.getParallelism)
if (needInternalConversion) {
- val conversionTransform = ScanUtil.convertToInternalRow(
+ ScanUtil.convertToInternalRow(
CodeGeneratorContext(config),
batchTransform,
boundedStreamTable.fieldIndexes,
@@ -98,16 +97,11 @@ class BatchExecBoundedStreamScan(
getTable.getQualifiedName,
config,
None)
- conversionTransform.setParallelism(getResource.getParallelism)
- conversionTransform
} else {
batchTransform.asInstanceOf[Transformation[BaseRow]]
}
}
- def getSourceTransformation: Transformation[_] =
- boundedStreamTable.dataStream.getTransformation
-
def needInternalConversion: Boolean = {
ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
ScanUtil.needsConversion(boundedStreamTable.dataType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
index 3528412..3d03932 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -161,6 +161,6 @@ class BatchExecCalc(
RelExplainUtil.calcToString(calcProgram, getExpressionString),
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ inputTransform.getParallelism)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
index a20535d..b11a7a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -198,7 +198,7 @@ class BatchExecCorrelate(
condition,
outputRowType,
joinType,
- getResource.getParallelism,
+ inputTransformation.getParallelism,
retainHeader = false,
getExpressionString,
"BatchExecCorrelate")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
index b82ead7..0cb7a7b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.operators.DamBehavior
@@ -144,6 +145,7 @@ class BatchExecExchange(
null,
shuffleMode)
transformation.setOutputType(outputRowType)
+ transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
transformation
case RelDistribution.Type.SINGLETON =>
@@ -152,6 +154,7 @@ class BatchExecExchange(
new GlobalPartitioner[BaseRow],
shuffleMode)
transformation.setOutputType(outputRowType)
+ transformation.setParallelism(1)
transformation
case RelDistribution.Type.RANDOM_DISTRIBUTED =>
@@ -160,6 +163,7 @@ class BatchExecExchange(
new RebalancePartitioner[BaseRow],
shuffleMode)
transformation.setOutputType(outputRowType)
+ transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
transformation
case RelDistribution.Type.BROADCAST_DISTRIBUTED =>
@@ -168,6 +172,7 @@ class BatchExecExchange(
new BroadcastPartitioner[BaseRow],
shuffleMode)
transformation.setOutputType(outputRowType)
+ transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
transformation
case RelDistribution.Type.HASH_DISTRIBUTED =>
@@ -186,6 +191,7 @@ class BatchExecExchange(
partitioner,
shuffleMode)
transformation.setOutputType(outputRowType)
+ transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
transformation
case _ =>
throw new UnsupportedOperationException(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
index e5441b6..e6a9c8e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
@@ -105,7 +105,7 @@ class BatchExecExpand(
operatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ inputTransform.getParallelism)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index cf8d0ee..46e8efd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -150,7 +150,7 @@ abstract class BatchExecHashAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ input.getParallelism)
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
ret.setResources(resource, resource)
ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
index 94050ff..e029b0f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -268,7 +268,7 @@ class BatchExecHashJoin(
getOperatorName,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
- getResource.getParallelism)
+ probe.getParallelism)
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
ret.setResources(resource, resource)
ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 0d3dd07..6074157 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -152,7 +152,7 @@ abstract class BatchExecHashWindowAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ input.getParallelism)
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
ret.setResources(resource, resource)
ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
index 3e14fa3..6b4ee94 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
@@ -111,7 +111,7 @@ class BatchExecLimit(
getOperatorName,
operator,
inputType,
- getResource.getParallelism)
+ input.getParallelism)
}
private def getOperatorName = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
index 89c631f..588c022 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
@@ -89,12 +89,10 @@ class BatchExecLookupJoin(
planner: BatchPlanner): Transformation[BaseRow] = {
val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[BaseRow]]
- val transformation = translateToPlanInternal(
+ translateToPlanInternal(
inputTransformation,
planner.getExecEnv,
planner.getTableConfig,
planner.getRelBuilder)
- transformation.setParallelism(getResource.getParallelism)
- transformation
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index 27e3629..db30712 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -161,13 +161,14 @@ class BatchExecNestedLoopJoin(
}
val resourceSpec = NodeResourceUtil.fromManagedMem(externalBufferMemoryInMb)
+ val parallelism = if (leftIsBuild) rInput.getParallelism else lInput.getParallelism
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
lInput,
rInput,
getOperatorName,
op,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ parallelism)
ret.setResources(resourceSpec, resourceSpec)
ret
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index 90dde6f..abe140a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -421,7 +421,7 @@ class BatchExecOverAggregate(
}
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
val ret = new OneInputTransformation(
- input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), getResource.getParallelism)
+ input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), input.getParallelism)
ret.setResources(resource, resource)
ret
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
index 7ecfed4..52d237f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
@@ -291,7 +291,7 @@ class BatchExecRank(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ input.getParallelism)
}
private def getOperatorName: String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
index be8ac6f..2b4e181 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
@@ -29,7 +29,6 @@ import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContex
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.nodes.calcite.Sink
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -98,23 +97,7 @@ class BatchExecSink[T](
"implemented and return the sink transformation DataStreamSink. " +
s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.")
}
- val sinkTransformation = dsSink.getTransformation
-
- val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
- planner.getTableConfig.getConfiguration)
-
- val maxSinkParallelism = sinkTransformation.getMaxParallelism
-
- // only set user's parallelism when user defines a sink parallelism
- if (configSinkParallelism > 0) {
- // set the parallelism when user's parallelism is not larger than max parallelism
- // or max parallelism is not set
- if (maxSinkParallelism < 0 || configSinkParallelism <= maxSinkParallelism) {
- sinkTransformation.setParallelism(configSinkParallelism)
- }
- }
-
- sinkTransformation
+ dsSink.getTransformation
case dsTableSink: DataStreamTableSink[T] =>
// In case of table to bounded stream through Batchplannerironment#toBoundedStream, we
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
index 7d0226a..66df094 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
@@ -131,7 +131,7 @@ class BatchExecSort(
s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ input.getParallelism)
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
ret.setResources(resource, resource)
ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
index 8ee73ce..80f56f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
@@ -125,6 +125,6 @@ abstract class BatchExecSortAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ input.getParallelism)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
index 578dd2a..5426fb8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
@@ -146,7 +146,7 @@ class BatchExecSortLimit(
getOperatorName,
operator,
inputType,
- getResource.getParallelism)
+ input.getParallelism)
}
private def getOperatorName = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index d4f9d22..c15b6f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -268,7 +268,7 @@ class BatchExecSortMergeJoin(
getOperatorName,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
- getResource.getParallelism)
+ rightInput.getParallelism)
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
ret.setResources(resource, resource)
ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 52707f1..8c621d5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -140,6 +140,6 @@ abstract class BatchExecSortWindowAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ input.getParallelism)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 11a1dc5..99f1b7c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -84,7 +84,6 @@ class BatchExecTableSourceScan(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
val inputTransform = getSourceTransformation(planner.getExecEnv)
- inputTransform.setParallelism(getResource.getParallelism)
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
@@ -110,7 +109,7 @@ class BatchExecTableSourceScan(
planner.getRelBuilder
)
if (needInternalConversion) {
- val conversionTransform = ScanUtil.convertToInternalRow(
+ ScanUtil.convertToInternalRow(
CodeGeneratorContext(config),
inputTransform.asInstanceOf[Transformation[Any]],
fieldIndexes,
@@ -119,8 +118,6 @@ class BatchExecTableSourceScan(
getTable.getQualifiedName,
config,
rowtimeExpression)
- conversionTransform.setParallelism(getResource.getParallelism)
- conversionTransform
} else {
inputTransform.asInstanceOf[Transformation[BaseRow]]
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
index 3c126b7..e1e13ea 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
@@ -80,7 +80,7 @@ class BatchExecValues(
getRelTypeName)
val transformation = planner.getExecEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
- transformation.setParallelism(getResource.getParallelism)
+ transformation.setParallelism(1)
transformation
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
index 182fc8e..a249436 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
@@ -122,10 +122,11 @@ class StreamExecCalc(
RelExplainUtil.calcToString(calcProgram, getExpressionString),
substituteStreamOperator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
+ inputTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
ret
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
index 0fe3922..ae5788a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -131,12 +131,13 @@ class StreamExecCorrelate(
condition,
outputRowType,
joinType,
- getResource.getParallelism,
+ inputTransformation.getParallelism,
retainHeader = true,
getExpressionString,
"StreamExecCorrelate")
- if (getResource.getMaxParallelism > 0) {
- transform.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ transform.setParallelism(1)
+ transform.setMaxParallelism(1)
}
transform
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
index 3306033..ef06da4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
@@ -77,9 +77,6 @@ class StreamExecDataStreamScan(
override def deriveRowType(): RelDataType = outputRowType
- def getSourceTransformation: Transformation[_] =
- dataStreamTable.dataStream.getTransformation
-
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new StreamExecDataStreamScan(cluster, traitSet, getTable, getRowType)
}
@@ -110,7 +107,6 @@ class StreamExecDataStreamScan(
val config = planner.getTableConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
val transform = inputDataStream.getTransformation
- transform.setParallelism(getResource.getParallelism)
val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder)
@@ -127,7 +123,7 @@ class StreamExecDataStreamScan(
}
val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
classOf[AbstractProcessStreamOperator[BaseRow]])
- val ret = ScanUtil.convertToInternalRow(
+ ScanUtil.convertToInternalRow(
ctx,
transform,
dataStreamTable.fieldIndexes,
@@ -138,8 +134,6 @@ class StreamExecDataStreamScan(
rowtimeExpr,
beforeConvert = extractElement,
afterConvert = resetElement)
- ret.setParallelism(getResource.getParallelism)
- ret
} else {
transform.asInstanceOf[Transformation[BaseRow]]
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
index 251d137..888ee6e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
@@ -144,10 +144,11 @@ class StreamExecDeduplicate(
getOperatorName,
operator,
rowTypeInfo,
- getResource.getParallelism)
+ inputTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
val selector = KeySelectorUtil.getBaseRowSelector(uniqueKeys, rowTypeInfo)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala
index 652962b..fa75f62 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.dag.Transformation
import org.apache.flink.runtime.state.KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
import org.apache.flink.streaming.api.transformations.PartitionTransformation
@@ -92,6 +93,7 @@ class StreamExecExchange(
inputTransform,
partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
transformation.setOutputType(outputTypeInfo)
+ transformation.setParallelism(1)
transformation
case RelDistribution.Type.HASH_DISTRIBUTED =>
// TODO Eliminate duplicate keys
@@ -104,6 +106,7 @@ class StreamExecExchange(
inputTransform,
partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
transformation.setOutputType(outputTypeInfo)
+ transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
transformation
case _ =>
throw new UnsupportedOperationException(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
index 85f8b95..ec113a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
@@ -100,9 +100,10 @@ class StreamExecExpand(
operatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getResource.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- transform.setMaxParallelism(getResource.getMaxParallelism)
+ inputTransform.getParallelism)
+ if (inputsContainSingleton()) {
+ transform.setParallelism(1)
+ transform.setMaxParallelism(1)
}
transform
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index 9676ab9..ba27a5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -189,10 +189,11 @@ class StreamExecGlobalGroupAggregate(
"GlobalGroupAggregate",
operator,
BaseRowTypeInfo.of(outRowType),
- getResource.getParallelism)
+ inputTransformation.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 6f8b906..676d05a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -206,10 +206,11 @@ class StreamExecGroupAggregate(
"GroupAggregate",
operator,
BaseRowTypeInfo.of(outRowType),
- getResource.getParallelism)
+ inputTransformation.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
index d5322a6..212b01a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
@@ -231,10 +231,11 @@ class StreamExecGroupWindowAggregate(
operatorName,
operator,
outRowType,
- getResource.getParallelism)
+ inputTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- transformation.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ transformation.setParallelism(1)
+ transformation.setMaxParallelism(1)
}
val selector = KeySelectorUtil.getBaseRowSelector(grouping, inputRowTypeInfo)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index 4cef31f..ae52f71 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -186,10 +186,11 @@ class StreamExecIncrementalGroupAggregate(
"IncrementalGroupAggregate",
operator,
BaseRowTypeInfo.of(outRowType),
- getResource.getParallelism)
+ inputTransformation.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
index 22bc209..77a8e09 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
@@ -197,10 +197,11 @@ class StreamExecJoin(
getJoinOperatorName(),
operator,
returnType,
- getResource.getParallelism)
+ leftTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
index 34078c7..98b4a02 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
@@ -170,10 +170,11 @@ class StreamExecLimit(
s"Limit(offset: $limitStart, fetch: ${fetchToString(fetch)})",
operator,
outputRowTypeInfo,
- getResource.getParallelism)
+ inputTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
val selector = NullBinaryRowKeySelector.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
index 8a07486..c898881 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
@@ -149,10 +149,11 @@ class StreamExecLocalGroupAggregate(
"LocalGroupAggregate",
operator,
BaseRowTypeInfo.of(outRowType),
- getResource.getParallelism)
+ inputTransformation.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- transformation.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ transformation.setParallelism(1)
+ transformation.setMaxParallelism(1)
}
transformation
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala
index 617e603..740a863 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala
@@ -101,9 +101,9 @@ class StreamExecLookupJoin(
planner.getExecEnv,
planner.getTableConfig,
planner.getRelBuilder)
- transformation.setParallelism(getResource.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- transformation.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ transformation.setParallelism(1)
+ transformation.setMaxParallelism(1)
}
transformation
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
index 167f0a1..6c07531 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
@@ -234,10 +234,11 @@ class StreamExecMatch(
toString,
operator,
outputRowTypeInfo,
- getResource.getParallelism
+ timestampedInput.getParallelism
)
- if (getResource.getMaxParallelism > 0) {
- transformation.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ transformation.setParallelism(1)
+ transformation.setMaxParallelism(1)
}
setKeySelector(transformation, inputTypeInfo)
transformation
@@ -293,9 +294,10 @@ class StreamExecMatch(
s"rowtime field: ($timeOrderField)",
new ProcessOperator(new RowtimeProcessFunction(timeIdx, inputTypeInfo)),
inputTypeInfo,
- getResource.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- transformation.setMaxParallelism(getResource.getMaxParallelism)
+ inputTransform.getParallelism)
+ if (inputsContainSingleton()) {
+ transformation.setParallelism(1)
+ transformation.setMaxParallelism(1)
}
transformation
} else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index 278edd4..c957110 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -285,10 +285,11 @@ class StreamExecOverAggregate(
"OverAggregate",
operator,
returnTypeInfo,
- getResource.getParallelism)
+ inputDS.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
index ebf1472..d227dd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
@@ -216,10 +216,11 @@ class StreamExecRank(
rankOpName,
operator,
outputRowTypeInfo,
- getResource.getParallelism)
+ inputTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index d505bf7..ea50bfe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -145,26 +145,6 @@ class StreamExecSink[T](
"implemented and return the sink transformation DataStreamSink. " +
s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.")
}
- val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
- planner.getTableConfig.getConfiguration)
-
- val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism
-
- // only set user's parallelism when user defines a sink parallelism
- if (configSinkParallelism > 0) {
- // set the parallelism when user's parallelism is not larger than max parallelism or
- // max parallelism is not set
- if (maxSinkParallelism < 0 || configSinkParallelism <= maxSinkParallelism) {
- dsSink.getTransformation.setParallelism(configSinkParallelism)
- }
- }
- if (!UpdatingPlanChecker.isAppendOnly(this) &&
- dsSink.getTransformation.getParallelism != transformation.getParallelism) {
- throw new TableException(s"The configured sink parallelism should be equal to the" +
- " input node when input is an update stream. The input parallelism is " +
- "${transformation.getParallelism}, however the configured sink parallelism is " +
- "${dsSink.getTransformation.getParallelism}.")
- }
dsSink.getTransformation
case dsTableSink: DataStreamTableSink[_] =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
index 796491b5..3a7c927 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
@@ -135,9 +135,10 @@ class StreamExecSort(
s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
sortOperator,
outputRowTypeInfo,
- getResource.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ input.getParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
ret
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
index 3ef6ac5..b34975b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
@@ -217,10 +217,11 @@ class StreamExecSortLimit(
getOperatorName,
operator,
outputRowTypeInfo,
- getResource.getParallelism)
+ inputTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
val selector = NullBinaryRowKeySelector.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index 38a3819..a4376fb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -96,7 +96,6 @@ class StreamExecTableSourceScan(
planner: StreamPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
val inputTransform = getSourceTransformation(planner.getExecEnv)
- inputTransform.setParallelism(getResource.getParallelism)
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
@@ -143,7 +142,6 @@ class StreamExecTableSourceScan(
rowtimeExpression,
beforeConvert = extractElement,
afterConvert = resetElement)
- conversionTransform.setParallelism(getResource.getParallelism)
conversionTransform
} else {
inputTransform.asInstanceOf[Transformation[BaseRow]]
@@ -174,7 +172,6 @@ class StreamExecTableSourceScan(
// No need to generate watermarks if no rowtime attribute is specified.
ingestedTable
}
- withWatermarks.getTransformation.setParallelism(getResource.getParallelism)
withWatermarks.getTransformation
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
index 2acaba4..9002452 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
@@ -143,10 +143,11 @@ class StreamExecTemporalJoin(
getJoinOperatorName,
joinOperator,
BaseRowTypeInfo.of(returnType),
- getResource.getParallelism)
+ leftTransform.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
index b2c3e7f..030c483 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
@@ -150,7 +150,7 @@ class StreamExecTemporalSort(
"ProcTimeSortOperator",
sortOperator,
outputRowTypeInfo,
- getResource.getParallelism)
+ input.getParallelism)
val selector = NullBinaryRowKeySelector.INSTANCE
ret.setStateKeySelector(selector)
@@ -190,10 +190,11 @@ class StreamExecTemporalSort(
"RowTimeSortOperator",
sortOperator,
outputRowTypeInfo,
- getResource.getParallelism)
+ input.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
val selector = NullBinaryRowKeySelector.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
index a958c6e..a95558b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
@@ -82,10 +82,8 @@ class StreamExecValues(
getRelTypeName)
val transformation = planner.getExecEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
- transformation.setParallelism(getResource.getParallelism)
- if (getResource.getMaxParallelism > 0) {
- transformation.setMaxParallelism(getResource.getMaxParallelism)
- }
+ transformation.setParallelism(1)
+ transformation.setMaxParallelism(1)
transformation
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
index 33f4aa7..d1127cc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
@@ -342,11 +342,12 @@ class StreamExecWindowJoin(
new KeyedCoProcessOperatorWithWatermarkDelay(rowJoinFunc, rowJoinFunc.getMaxOutputDelay)
.asInstanceOf[TwoInputStreamOperator[BaseRow,BaseRow,BaseRow]],
returnTypeInfo,
- getResource.getParallelism
+ leftPlan.getParallelism
)
- if (getResource.getMaxParallelism > 0) {
- ret.setMaxParallelism(getResource.getMaxParallelism)
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
index 31df785..750cb56 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
@@ -331,10 +331,6 @@ class NodeTreeWriterImpl(
printValues.add(Pair.of("__id__", rel.getId.toString))
}
- if (withResource) {
- printValues.add(Pair.of("resource", node.getResource))
- }
-
if (withRetractTraits) {
rel match {
case streamRel: StreamPhysicalRel =>
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/MockNodeTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/MockNodeTestBase.java
deleted file mode 100644
index c482426..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/MockNodeTestBase.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecValues;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecValues;
-
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Base for test with mock node list.
- */
-public class MockNodeTestBase {
-
- protected List<ExecNode> nodeList;
- private final boolean isBatchMode;
-
- public MockNodeTestBase(boolean isBatchMode) {
- this.isBatchMode = isBatchMode;
- }
-
- private void updateNode(int index, ExecNode<?, ?> node) {
- nodeList.set(index, node);
- NodeResource resource = new NodeResource();
- when(node.getResource()).thenReturn(resource);
- when(node.toString()).thenReturn("id: " + index);
- if (node instanceof BatchExecTableSourceScan) {
- Transformation transformation = mock(Transformation.class);
- when(((BatchExecTableSourceScan) node).getSourceTransformation(any())).thenReturn(transformation);
- when(transformation.getMaxParallelism()).thenReturn(-1);
- } else if (node instanceof StreamExecTableSourceScan) {
- Transformation transformation = mock(Transformation.class);
- when(((StreamExecTableSourceScan) node).getSourceTransformation(any())).thenReturn(transformation);
- when(transformation.getMaxParallelism()).thenReturn(-1);
- } else if (node instanceof BatchExecBoundedStreamScan) {
- Transformation transformation = mock(Transformation.class);
- when(((BatchExecBoundedStreamScan) node).getSourceTransformation()).thenReturn(transformation);
- } else if (node instanceof StreamExecDataStreamScan) {
- Transformation transformation = mock(Transformation.class);
- when(((StreamExecDataStreamScan) node).getSourceTransformation()).thenReturn(transformation);
- } else if (node instanceof BatchExecExchange) {
- RelDistribution distribution = mock(RelDistribution.class);
- when(distribution.getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
- when(((BatchExecExchange) node).getDistribution()).thenReturn(distribution);
- } else if (node instanceof StreamExecExchange) {
- RelDistribution distribution = mock(RelDistribution.class);
- when(distribution.getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
- when(((StreamExecExchange) node).getDistribution()).thenReturn(distribution);
- }
- }
-
- protected ExecNode<?, ?> updateCalc(int index) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
- updateNode(index, node);
- return node;
- }
-
- protected ExecNode<?, ?> updateValues(int index) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecValues.class) : mock(StreamExecValues.class);
- updateNode(index, node);
- return node;
- }
-
- protected ExecNode<?, ?> updateUnion(int index) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecUnion.class) : mock(StreamExecUnion.class);
- updateNode(index, node);
- return node;
- }
-
- protected ExecNode<?, ?> updateExchange(int index) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecExchange.class, RETURNS_DEEP_STUBS) :
- mock(StreamExecExchange.class, RETURNS_DEEP_STUBS);
- updateNode(index, node);
- return node;
- }
-
- protected ExecNode<?, ?> updateExchange(int index, RelDistribution.Type type) {
- ExecNode<?, ?> node = updateExchange(index);
- if (isBatchMode) {
- when(((BatchExecExchange) node).getDistribution().getType()).thenReturn(type);
- } else {
- when(((StreamExecExchange) node).getDistribution().getType()).thenReturn(type);
- }
- return node;
- }
-
- protected ExecNode<?, ?> updateTableSource(int index) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecTableSourceScan.class) : mock(StreamExecTableSourceScan.class);
- updateNode(index, node);
- return node;
- }
-
- protected ExecNode<?, ?> updateTableSource(int index, int maxParallelism) {
- ExecNode<?, ?> node = updateTableSource(index);
- if (isBatchMode) {
- when(((BatchExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
- } else {
- when(((StreamExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
- }
- return node;
- }
-
- protected ExecNode<?, ?> updateStreamScan(int index) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecBoundedStreamScan.class) : mock(StreamExecDataStreamScan.class);
- updateNode(index, node);
- return node;
- }
-
- protected ExecNode<?, ?> updateStreamScan(int index, int parallelism) {
- ExecNode<?, ?> node = updateStreamScan(index);
- if (isBatchMode) {
- when(((BatchExecBoundedStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
- } else {
- when(((StreamExecDataStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
- }
- return node;
- }
-
- protected void createNodeList(int num) {
- nodeList = new LinkedList<>();
- for (int i = 0; i < num; i++) {
- ExecNode<?, ?> node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
- when(node.getInputNodes()).thenReturn(new ArrayList<>());
- when(node.getResource()).thenReturn(new NodeResource());
- when(node.toString()).thenReturn("id: " + i);
- nodeList.add(node);
- }
- }
-
- protected void connect(int nodeIndex, int... inputNodeIndexes) {
- List<ExecNode<?, ?>> inputNodes = new ArrayList<>(inputNodeIndexes.length);
- for (int inputIndex : inputNodeIndexes) {
- ExecNode<?, ?> input = nodeList.get(inputIndex);
- inputNodes.add(input);
- }
- when(nodeList.get(nodeIndex).getInputNodes()).thenReturn(inputNodes);
- if (inputNodeIndexes.length == 1 && nodeList.get(nodeIndex) instanceof SingleRel) {
- when(((SingleRel) nodeList.get(nodeIndex)).getInput()).thenReturn((RelNode) nodeList.get(inputNodeIndexes[0]));
- } else if (inputNodeIndexes.length == 2 && nodeList.get(nodeIndex) instanceof BiRel) {
- when(((BiRel) nodeList.get(nodeIndex)).getLeft()).thenReturn((RelNode) nodeList.get(inputNodeIndexes[0]));
- when(((BiRel) nodeList.get(nodeIndex)).getRight()).thenReturn((RelNode) nodeList.get(inputNodeIndexes[1]));
- }
- }
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
deleted file mode 100644
index c23dca3..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.resource.MockNodeTestBase;
-
-import org.apache.calcite.rel.RelDistribution;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for FinalParallelismSetter.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class FinalParallelismSetterTest extends MockNodeTestBase {
-
- private StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
- public FinalParallelismSetterTest(boolean isBatchMode) {
- super(isBatchMode);
- }
-
- @Before
- public void setUp() {
- sEnv.setParallelism(21);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSource() {
- /**
- * 0, Source 1, Source 2, Values 4, Source 5, Source
- * \ / / / /
- * 3, Union
- */
- createNodeList(6);
- updateTableSource(0, 5);
- updateTableSource(1);
- updateValues(2);
- updateUnion(3);
- updateStreamScan(4, 7);
- updateStreamScan(5);
- connect(3, 0, 1, 2, 4, 5);
- Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = FinalParallelismSetter.calculate(sEnv, Collections.singletonList(nodeList.get(3)));
- assertEquals(3, finalParallelismNodeMap.size());
- assertEquals(5, nodeList.get(0).getResource().getMaxParallelism());
- assertEquals(1, finalParallelismNodeMap.get(nodeList.get(2)).intValue());
- assertEquals(7, finalParallelismNodeMap.get(nodeList.get(4)).intValue());
- assertEquals(21, finalParallelismNodeMap.get(nodeList.get(5)).intValue());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testExchange() {
- /**
- * 0, Source 1, Source 10, Source
- * | | |
- * 2, Exchange 3, Exchange 8, Source 9, Exchange
- * | | \ /
- * 4, Calc 5, Calc 7, Join
- * \ / /
- * 6, Union
- */
- createNodeList(11);
- updateTableSource(0, 5);
- updateExchange(2, RelDistribution.Type.BROADCAST_DISTRIBUTED);
- updateExchange(3, RelDistribution.Type.SINGLETON);
- updateExchange(9, RelDistribution.Type.SINGLETON);
- connect(2, 0);
- connect(4, 2);
- connect(3, 1);
- connect(5, 3);
- connect(6, 4, 5, 7);
- connect(7, 8, 9);
- connect(9, 10);
- Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = FinalParallelismSetter.calculate(sEnv, Collections.singletonList(nodeList.get(6)));
- assertEquals(2, finalParallelismNodeMap.size());
- assertEquals(5, nodeList.get(0).getResource().getMaxParallelism());
- assertEquals(1, finalParallelismNodeMap.get(nodeList.get(5)).intValue());
- assertEquals(1, finalParallelismNodeMap.get(nodeList.get(7)).intValue());
- }
-
- @Parameterized.Parameters(name = "isBatchMode = {0}")
- public static Collection<Object[]> runMode() {
- return Arrays.asList(
- new Object[] { false, },
- new Object[] { true });
- }
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
deleted file mode 100644
index 952c45a..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.resource.MockNodeTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Test for {@link ShuffleStageGenerator}.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ShuffleStageGeneratorTest extends MockNodeTestBase {
-
- private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap;
-
- public ShuffleStageGeneratorTest(boolean isBatchMode) {
- super(isBatchMode);
- }
-
- @Before
- public void setUp() {
- finalParallelismNodeMap = new HashMap<>();
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testBatchGenerateShuffleStags() {
- /**
- *
- * 0, Source 1, Source
- * \ /
- * 2, Union
- * / \
- * 3, Calc 4, Calc
- * | |
- * 5, Exchange 6, Exchange
- * \ /
- * 7, Join
- * |
- * 8, Calc
- */
- createNodeList(9);
- updateUnion(2);
- updateExchange(5);
- updateExchange(6);
- connect(2, 0, 1);
- connect(3, 2);
- connect(4, 2);
- connect(5, 3);
- connect(6, 4);
- connect(7, 5, 6);
- connect(8, 7);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(8)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 7, 8);
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 3, 4);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testMultiOutput() {
- /**
- *
- * 0, Source 2, Source 4, Source 6, Source
- * | | | |
- * 1, Calc 3, Calc 5, Calc 7, Exchange
- * \ / \ / \ /
- * 8, Join 9, Join 10, Join
- * \ / \ /
- * \ 12, Exchange \ /
- * \ / \ /
- * 11, Join 13, Union
- * \ |
- * 15, Exchange 14, Calc
- * \ /
- * 16, Join
- */
- createNodeList(17);
- updateExchange(7);
- updateExchange(12);
- updateUnion(13);
- updateExchange(15);
- connect(1, 0);
- connect(3, 2);
- connect(5, 4);
- connect(7, 6);
- connect(8, 1, 3);
- connect(9, 3, 5);
- connect(10, 5, 7);
- connect(12, 9);
- connect(11, 8, 12);
- connect(13, 9, 10);
- connect(14, 13);
- connect(15, 11);
- connect(16, 15, 14);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(16)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 8, 3, 2, 9, 5, 4, 10, 11, 14, 16);
- assertSameShuffleStage(nodeShuffleStageMap, 6);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testWithFinalParallelism() {
- /**
- *
- * 0, Source 2, Source
- * | |
- * 1, Calc 3, Calc 6, Source
- * \ / /
- * 4, Union
- * |
- * 5, Calc
- */
- createNodeList(7);
- ExecNode<?, ?> scan0 = updateTableSource(0);
- scan0.getResource().setMaxParallelism(10);
- ExecNode<?, ?> scan1 = updateTableSource(2);
- finalParallelismNodeMap.put(scan1, 11);
- updateUnion(4);
- updateCalc(5);
- updateTableSource(6);
- connect(1, 0);
- connect(3, 2);
- connect(4, 1, 3, 6);
- connect(5, 4);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
- assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testWithFinalParallelism1() {
- /**
- *
- * 0, Source 2, Source
- * | |
- * 1, Calc 3, Calc
- * \ /
- * 4, Union
- * |
- * 5, Calc
- */
- createNodeList(7);
- ExecNode<?, ?> scan0 = updateTableSource(0);
- ExecNode<?, ?> scan1 = updateTableSource(2);
- finalParallelismNodeMap.put(scan0, 10);
- finalParallelismNodeMap.put(scan1, 11);
- updateUnion(4);
- ExecNode<?, ?> calc = updateCalc(5);
- finalParallelismNodeMap.put(calc, 12);
- connect(1, 0);
- connect(3, 2);
- connect(4, 1, 3);
- connect(5, 4);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
- assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
- assertSameShuffleStage(nodeShuffleStageMap, 5);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testWithFinalParallelism2() {
- /**
- *
- * 0, Source 2, Source
- * | |
- * | 3, Exchange
- * | |
- * 1, Calc 4, Calc
- * \ /
- * 5, Union
- * |
- * 6, Calc
- */
- createNodeList(7);
- ExecNode<?, ?> scan0 = updateTableSource(0);
- ExecNode<?, ?> scan1 = updateTableSource(2);
- finalParallelismNodeMap.put(scan0, 10);
- finalParallelismNodeMap.put(scan1, 11);
- updateExchange(3);
- ExecNode<?, ?> calc = updateCalc(4);
- finalParallelismNodeMap.put(calc, 1);
- updateUnion(5);
- connect(1, 0);
- connect(3, 2);
- connect(4, 3);
- connect(5, 1, 4);
- connect(6, 5);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(6)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 6);
- assertSameShuffleStage(nodeShuffleStageMap, 2);
- assertSameShuffleStage(nodeShuffleStageMap, 4);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testWithFinalParallelism3() {
- /**
- *
- * 0, Source 2, Source
- * | |
- * 1, Calc 3, Calc 6, Source 7,Source
- * \ / / /
- * 4, Union
- * |
- * 5, Calc
- */
- createNodeList(8);
- ExecNode<?, ?> scan0 = updateTableSource(0);
- ExecNode<?, ?> scan1 = updateTableSource(2);
- finalParallelismNodeMap.put(scan0, 11);
- scan1.getResource().setMaxParallelism(5);
- ExecNode<?, ?> union4 = updateUnion(4);
- finalParallelismNodeMap.put(union4, 5);
- updateCalc(5);
- updateTableSource(6);
- updateTableSource(7);
- connect(1, 0);
- connect(3, 2);
- connect(4, 1, 3, 6, 7);
- connect(5, 4);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
- assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5, 7);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testWithFinalParallelism4() {
- /**
- *
- * 0, Source 2, Source
- * | |
- * 1, Calc 3, Calc
- * \ /
- * 4, Union
- * |
- * 5, Calc
- */
- createNodeList(6);
- ExecNode<?, ?> scan0 = updateTableSource(0);
- ExecNode<?, ?> scan1 = updateTableSource(2);
- finalParallelismNodeMap.put(scan0, 11);
- finalParallelismNodeMap.put(scan1, 5);
- ExecNode<?, ?> union4 = updateUnion(4);
- finalParallelismNodeMap.put(union4, 3);
- updateCalc(5);
- connect(1, 0);
- connect(3, 2);
- connect(4, 1, 3);
- connect(5, 4);
-
- Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
- assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
- assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
- assertSameShuffleStage(nodeShuffleStageMap, 5);
- }
-
- private void assertSameShuffleStage(Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap, int ... nodeIndexes) {
- Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
- for (int index : nodeIndexes) {
- nodeSet.add(nodeList.get(index));
- }
- for (int index : nodeIndexes) {
- assertNotNull("shuffleStage should not be null. node index: " + index, nodeShuffleStageMap.get(nodeList.get(index)));
- assertEquals("node index: " + index, nodeSet, nodeShuffleStageMap.get(nodeList.get(index)).getExecNodeSet());
- }
- }
-
- @Parameterized.Parameters(name = "isBatchMode = {0}")
- public static Collection<Object[]> runMode() {
- return Arrays.asList(
- new Object[] { false, },
- new Object[] { true });
- }
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
deleted file mode 100644
index 97cba22..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Test for {@link ShuffleStageParallelismCalculator}.
- */
-public class ShuffleStageParallelismCalculatorTest {
-
- private Configuration tableConf;
- private BatchExecTableSourceScan batchTableSourceScan = mock(BatchExecTableSourceScan.class);
- private StreamExecTableSourceScan streamTableSourceScan = mock(StreamExecTableSourceScan.class);
- private int envParallelism = 5;
-
- @Before
- public void setUp() {
- tableConf = new Configuration();
- tableConf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 50);
- }
-
- @Test
- public void testOnlyBatchSource() {
- ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
- when(shuffleStage0.getMaxParallelism()).thenReturn(40);
- when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(batchTableSourceScan)));
- ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
- verify(shuffleStage0).setParallelism(40, false);
- }
-
- @Test
- public void testOnlyStreamSource() {
- ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
- when(shuffleStage0.getMaxParallelism()).thenReturn(20);
- when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(streamTableSourceScan)));
- ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
- verify(shuffleStage0).setParallelism(20, false);
- }
-
- @Test
- public void testBatchSourceAndCalc() {
- ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
- when(shuffleStage0.getMaxParallelism()).thenReturn(20);
- BatchExecCalc calc = mock(BatchExecCalc.class);
- when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(batchTableSourceScan, calc)));
- ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
- verify(shuffleStage0).setParallelism(20, false);
- }
-
- @Test
- public void testStreamSourceAndCalc() {
- tableConf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM, 60);
- ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
- when(shuffleStage0.getMaxParallelism()).thenReturn(60);
- StreamExecCalc calc = mock(StreamExecCalc.class);
- when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(streamTableSourceScan, calc)));
- ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
- verify(shuffleStage0).setParallelism(60, false);
- }
-
- @Test
- public void testNoSource() {
- ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
- BatchExecCalc calc = mock(BatchExecCalc.class);
- when(shuffleStage0.getMaxParallelism()).thenReturn(Integer.MAX_VALUE);
- when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(calc)));
- ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
- verify(shuffleStage0).setParallelism(50, false);
- }
-
- @Test
- public void testEnvParallelism() {
- tableConf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, -1);
- ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
- when(shuffleStage0.getMaxParallelism()).thenReturn(4);
- BatchExecCalc calc = mock(BatchExecCalc.class);
- when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(batchTableSourceScan, calc)));
- ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
- verify(shuffleStage0).setParallelism(4, false);
- }
-
- private Set<ExecNode<?, ?>> getNodeSet(List<ExecNode<?, ?>> nodeList) {
- Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
- nodeSet.addAll(nodeList);
- return nodeSet;
- }
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
index bc53a0a..223aeee 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
@@ -15,18 +15,18 @@ HashJoin(joinType=[InnerJoin], where=[=(s3, s30)], select=[b1, l2, s3, d4, dd5,
: Data Source
content : collect elements with CollectionInputFormat
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+ ship_strategy : FORWARD
+
: Data Source
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
- ship_strategy : FORWARD
-
- : Operator
- content : HashJoin(where: (s3 = s30), buildLeft)
- ship_strategy : BROADCAST
+ content : HashJoin(where: (s3 = s30), buildLeft)
+ ship_strategy : BROADCAST
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.xml
deleted file mode 100644
index c692020..0000000
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.xml
+++ /dev/null
@@ -1,153 +0,0 @@
-<?xml version="1.0" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<Root>
- <TestCase name="testConfigSourceParallelism[isBatchMode=false]">
- <Resource name="sql">
- <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], resource=[{parallelism=1, maxParallelism=1}])
- +- Exchange(distribution=[single], resource=[{parallelism=-1}])
- +- GroupAggregate(groupBy=[c], select=[c, SUM(a) AS sum_a], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
- +- Calc(select=[c, a], resource=[{parallelism=100}])
- +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=100}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testConfigSourceParallelism[isBatchMode=true]">
- <Resource name="sql">
- <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], resource=[{parallelism=1, maxParallelism=1}])
- +- Exchange(distribution=[single], resource=[{parallelism=-1}])
- +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], resource=[{parallelism=18}])
- +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
- +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS sum$0], resource=[{parallelism=100}])
- +- Calc(select=[c, a], resource=[{parallelism=100}])
- +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=100}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSortLimit[isBatchMode=false]">
- <Resource name="sql">
- <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], resource=[{parallelism=1, maxParallelism=1}])
- +- Exchange(distribution=[single], resource=[{parallelism=-1}])
- +- GroupAggregate(groupBy=[c], select=[c, SUM(a) AS sum_a], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
- +- Calc(select=[c, a], resource=[{parallelism=18}])
- +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSourcePartitionMaxNum[isBatchMode=true]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM table3]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSortLimit[isBatchMode=true]">
- <Resource name="sql">
- <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], resource=[{parallelism=1, maxParallelism=1}])
- +- Exchange(distribution=[single], resource=[{parallelism=-1}])
- +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], resource=[{parallelism=18}])
- +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
- +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS sum$0], resource=[{parallelism=18}])
- +- Calc(select=[c, a], resource=[{parallelism=18}])
- +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testUnionQuery[isBatchMode=true]">
- <Resource name="sql">
- <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[sum_a, g], resource=[{parallelism=18}])
-+- HashAggregate(isMerge=[true], groupBy=[g], select=[g, Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
- +- LocalHashAggregate(groupBy=[g], select=[g, Partial_SUM(a) AS sum$0], resource=[{parallelism=18}])
- +- Calc(select=[g, a], resource=[{parallelism=18}])
- +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[a, b, e, g], build=[left], resource=[{parallelism=18}])
- :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
- : +- Union(all=[true], union=[a, b], resource=[{parallelism=-1}])
- : :- Calc(select=[a, b], resource=[{parallelism=18}])
- : : +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
- : +- Calc(select=[a, b], resource=[{parallelism=18}])
- : +- TableSourceScan(table=[[default_catalog, default_database, table4, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
- +- Calc(select=[e, g], resource=[{parallelism=18}])
- +- TableSourceScan(table=[[default_catalog, default_database, table5, source: [MockTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h], resource=[{parallelism=18}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSourcePartitionMaxNum[isBatchMode=false]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM table3]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testUnionQuery[isBatchMode=false]">
- <Resource name="sql">
- <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[sum_a, g], resource=[{parallelism=18}])
-+- GroupAggregate(groupBy=[g], select=[g, SUM(a) AS sum_a], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
- +- Calc(select=[g, a], resource=[{parallelism=18}])
- +- Join(joinType=[InnerJoin], where=[=(b, e)], select=[a, b, e, g], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], resource=[{parallelism=18}])
- :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
- : +- Calc(select=[a, b], resource=[{parallelism=18}])
- : +- Union(all=[true], union=[a, b, c], resource=[{parallelism=-1}])
- : :- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
- : +- TableSourceScan(table=[[default_catalog, default_database, table4, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
- +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
- +- Calc(select=[e, g], resource=[{parallelism=18}])
- +- TableSourceScan(table=[[default_catalog, default_database, table5, source: [MockTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h], resource=[{parallelism=18}])
-]]>
- </Resource>
- </TestCase>
-</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index ca4650c..4ae9b88 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.utils.TableTestBase
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
-import org.junit.Test
+import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -40,6 +40,12 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
val LONG = new BigIntType()
val INT = new IntType()
+ @Before
+ def before(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setInteger(
+ ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+ }
+
@Test
def testExplainWithTableSourceScan(): Unit = {
util.verifyExplain("SELECT * FROM MyTable", extended)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 59ae7d6..9952e34 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.utils.TableTestBase
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
-import org.junit.Test
+import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -42,6 +42,12 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
val LONG = new BigIntType()
val INT = new IntType()
+ @Before
+ def before(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setInteger(
+ ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+ }
+
@Test
def testExplainTableSourceScan(): Unit = {
util.verifyExplain("SELECT * FROM MyTable", extended)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.scala
deleted file mode 100644
index 2716919..0000000
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource
-
-import org.apache.flink.api.common.io.OutputFormat
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory
-import org.apache.flink.streaming.api.transformations.{SinkTransformation, SourceTransformation}
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableSchema, Types}
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.plan.stats.TableStats
-import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil, TestingTableEnvironment}
-import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
-import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
-import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.StreamTableSource
-
-import org.junit.Assert.assertEquals
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{Before, Test}
-import org.mockito.Mockito.{mock, when}
-
-import java.util
-
-@RunWith(classOf[Parameterized])
-class ExecNodeResourceTest(isBatchMode: Boolean) extends TableTestBase {
-
- private var testUtil: TableTestUtil = _
-
- @Before
- def before(): Unit = {
- testUtil = if(isBatchMode) batchTestUtil() else streamTestUtil()
- val table3Stats = new TableStats(5000000)
- val table3Source = new MockTableSource(isBatchMode,
- new TableSchema(Array("a", "b", "c"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
- testUtil.addTableSource(
- "table3", table3Source, FlinkStatistic.builder().tableStats(table3Stats).build())
- val table5Stats = new TableStats(8000000)
- val table5Source = new MockTableSource(isBatchMode,
- new TableSchema(Array("d", "e", "f", "g", "h"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.INT, Types.STRING, Types.LONG)))
- testUtil.addTableSource(
- "table5", table5Source, FlinkStatistic.builder().tableStats(table5Stats).build())
- ExecNodeResourceTest.setResourceConfig(testUtil.tableEnv.getConfig)
- }
-
- @Test
- def testSourcePartitionMaxNum(): Unit = {
- val sqlQuery = "SELECT * FROM table3"
- testUtil.verifyResource(sqlQuery)
- }
-
- @Test
- def testSortLimit(): Unit = {
- val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2"
- testUtil.verifyResource(sqlQuery)
- }
-
- @Test
- def testConfigSourceParallelism(): Unit = {
- testUtil.tableEnv.getConfig.getConfiguration.setInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM, 100)
- val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2"
- testUtil.verifyResource(sqlQuery)
- }
-
- @Test
- def testUnionQuery(): Unit = {
- val statsOfTable4 = new TableStats(100L)
- val table4Source = new MockTableSource(isBatchMode,
- new TableSchema(Array("a", "b", "c"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
- testUtil.addTableSource(
- "table4", table4Source, FlinkStatistic.builder().tableStats(statsOfTable4).build())
-
- val sqlQuery = "SELECT sum(a) as sum_a, g FROM " +
- "(SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 " +
- "WHERE b = e group by g"
- testUtil.verifyResource(sqlQuery)
- }
-
- @Test
- def testSinkSelfParallelism(): Unit = {
- val sqlQuery = "SELECT * FROM table3"
- val table = testUtil.tableEnv.sqlQuery(sqlQuery)
- val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), -1)
-
- testUtil.writeToSink(table, tableSink, "sink")
- testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
- assertEquals(17, tableSink.getSinkTransformation.getParallelism)
- }
-
- @Test
- def testSinkConfigParallelism(): Unit = {
- testUtil.tableEnv.getConfig.getConfiguration.setInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM,
- 25
- )
- val sqlQuery = "SELECT * FROM table3"
- val table = testUtil.tableEnv.sqlQuery(sqlQuery)
- val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), -1)
-
- testUtil.writeToSink(table, tableSink, "sink")
- testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
- assertEquals(25, tableSink.getSinkTransformation.getParallelism)
- }
-
- @Test
- def testSinkConfigParallelismWhenMax1(): Unit = {
- testUtil.tableEnv.getConfig.getConfiguration.setInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM,
- 25
- )
- val sqlQuery = "SELECT * FROM table3"
- val table = testUtil.tableEnv.sqlQuery(sqlQuery)
- val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), 23)
-
- testUtil.writeToSink(table, tableSink, "sink")
- testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
- assertEquals(17, tableSink.getSinkTransformation.getParallelism)
- }
-
- @Test
- def testSinkConfigParallelismWhenMax2(): Unit = {
- testUtil.tableEnv.getConfig.getConfiguration.setInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM,
- 25
- )
- val sqlQuery = "SELECT * FROM table3"
- val table = testUtil.tableEnv.sqlQuery(sqlQuery)
- val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
- Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), 25)
-
- testUtil.writeToSink(table, tableSink, "sink")
- testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
- assertEquals(25, tableSink.getSinkTransformation.getParallelism)
- }
-}
-
-object ExecNodeResourceTest {
-
- @Parameterized.Parameters(name = "isBatchMode={0}")
- def parameters(): util.Collection[Array[Any]] = {
- util.Arrays.asList(
- Array(true),
- Array(false)
- )
- }
-
- def setResourceConfig(tableConfig: TableConfig): Unit = {
- tableConfig.getConfiguration.setInteger(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
- 18)
- }
-}
-
-/**
- * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for resource testing.
- */
-class MockTableSource(override val isBounded: Boolean, schema: TableSchema)
- extends StreamTableSource[BaseRow] {
-
- override def getDataStream(
- execEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = {
- val transformation = mock(classOf[SourceTransformation[BaseRow]])
- when(transformation.getMaxParallelism).thenReturn(-1)
- val bs = mock(classOf[DataStream[BaseRow]])
- when(bs.getTransformation).thenReturn(transformation)
- when(transformation.getOutputType).thenReturn(getReturnType)
- val factory = mock(classOf[StreamOperatorFactory[BaseRow]])
- when(factory.isStreamSource).thenReturn(!isBounded)
- when(transformation.getOperatorFactory).thenReturn(factory)
- bs
- }
-
- override def getReturnType: TypeInformation[BaseRow] = {
- val LogicalTypes = schema.getFieldTypes.map(
- TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType)
- new BaseRowTypeInfo(LogicalTypes, schema.getFieldNames)
- }
-
- override def getTableSchema: TableSchema = schema
-}
-
-/**
- * Batch/Stream [[org.apache.flink.table.sinks.TableSink]] for resource testing.
- */
-class MockTableSink(schema: TableSchema, maxParallelism: Int) extends StreamTableSink[BaseRow]
-with AppendStreamTableSink[BaseRow] {
-
- private var sinkTransformation:SinkTransformation[BaseRow] = _
-
- override def emitDataStream(dataStream: DataStream[BaseRow]) = ???
-
- override def consumeDataStream(dataStream: DataStream[BaseRow]): DataStreamSink[_] = {
- val outputFormat = mock(classOf[OutputFormat[BaseRow]])
- val ret = dataStream.writeUsingOutputFormat(outputFormat).name("collect")
- ret.getTransformation.setParallelism(17)
- if (maxParallelism > 0) {
- ret.getTransformation.setMaxParallelism(maxParallelism)
- }
- sinkTransformation = ret.getTransformation
- ret
- }
-
- override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]):
- TableSink[BaseRow] = {
- this
- }
-
- override def getOutputType: TypeInformation[BaseRow] = {
- val LogicalTypes = schema.getFieldTypes.map(
- TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType)
- new BaseRowTypeInfo(LogicalTypes, schema.getFieldNames)
- }
-
- override def getFieldNames: Array[String] = schema.getFieldNames
-
- /**
- * @deprecated Use the field types of { @link #getTableSchema()} instead.
- */
- override def getFieldTypes: Array[TypeInformation[_]] = schema.getFieldTypes
-
- def getSinkTransformation: SinkTransformation[BaseRow] = sinkTransformation
-}
-