You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 07:54:49 UTC

[flink] branch master updated: [FLINK-13494][table-planner-blink] Remove source and sink parallelism configurations

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8266707  [FLINK-13494][table-planner-blink] Remove source and sink parallelism configurations
8266707 is described below

commit 8266707502889e279256d09645376c665ab0cdcb
Author: Xupingyong <xu...@163.com>
AuthorDate: Wed Jul 31 13:56:31 2019 +0800

    [FLINK-13494][table-planner-blink] Remove source and sink parallelism configurations
    
    This closes #9277
---
 .../table/api/config/ExecutionConfigOptions.java   |  19 +-
 .../planner/plan/nodes/resource/NodeResource.java  |  57 ----
 .../plan/nodes/resource/NodeResourceUtil.java      |  38 +--
 .../parallelism/FinalParallelismSetter.java        | 135 ---------
 .../resource/parallelism/ParallelismProcessor.java |  73 -----
 .../nodes/resource/parallelism/ShuffleStage.java   |  88 ------
 .../parallelism/ShuffleStageGenerator.java         | 157 ----------
 .../ShuffleStageParallelismCalculator.java         |  92 ------
 .../table/planner/delegation/BatchPlanner.scala    |   6 +-
 .../table/planner/delegation/PlannerBase.scala     |  10 +
 .../table/planner/delegation/StreamPlanner.scala   |  11 +-
 .../table/planner/plan/nodes/exec/ExecNode.scala   |  26 +-
 .../batch/BatchExecBoundedStreamScan.scala         |   8 +-
 .../plan/nodes/physical/batch/BatchExecCalc.scala  |   2 +-
 .../nodes/physical/batch/BatchExecCorrelate.scala  |   2 +-
 .../nodes/physical/batch/BatchExecExchange.scala   |   6 +
 .../nodes/physical/batch/BatchExecExpand.scala     |   2 +-
 .../batch/BatchExecHashAggregateBase.scala         |   2 +-
 .../nodes/physical/batch/BatchExecHashJoin.scala   |   2 +-
 .../batch/BatchExecHashWindowAggregateBase.scala   |   2 +-
 .../plan/nodes/physical/batch/BatchExecLimit.scala |   2 +-
 .../nodes/physical/batch/BatchExecLookupJoin.scala |   4 +-
 .../physical/batch/BatchExecNestedLoopJoin.scala   |   3 +-
 .../physical/batch/BatchExecOverAggregate.scala    |   2 +-
 .../plan/nodes/physical/batch/BatchExecRank.scala  |   2 +-
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  19 +-
 .../plan/nodes/physical/batch/BatchExecSort.scala  |   2 +-
 .../batch/BatchExecSortAggregateBase.scala         |   2 +-
 .../nodes/physical/batch/BatchExecSortLimit.scala  |   2 +-
 .../physical/batch/BatchExecSortMergeJoin.scala    |   2 +-
 .../batch/BatchExecSortWindowAggregateBase.scala   |   2 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |   5 +-
 .../nodes/physical/batch/BatchExecValues.scala     |   2 +-
 .../nodes/physical/stream/StreamExecCalc.scala     |   7 +-
 .../physical/stream/StreamExecCorrelate.scala      |   7 +-
 .../physical/stream/StreamExecDataStreamScan.scala |   8 +-
 .../physical/stream/StreamExecDeduplicate.scala    |   7 +-
 .../nodes/physical/stream/StreamExecExchange.scala |   3 +
 .../nodes/physical/stream/StreamExecExpand.scala   |   7 +-
 .../stream/StreamExecGlobalGroupAggregate.scala    |   7 +-
 .../physical/stream/StreamExecGroupAggregate.scala |   7 +-
 .../stream/StreamExecGroupWindowAggregate.scala    |   7 +-
 .../StreamExecIncrementalGroupAggregate.scala      |   7 +-
 .../nodes/physical/stream/StreamExecJoin.scala     |   7 +-
 .../nodes/physical/stream/StreamExecLimit.scala    |   7 +-
 .../stream/StreamExecLocalGroupAggregate.scala     |   7 +-
 .../physical/stream/StreamExecLookupJoin.scala     |   6 +-
 .../nodes/physical/stream/StreamExecMatch.scala    |  14 +-
 .../physical/stream/StreamExecOverAggregate.scala  |   7 +-
 .../nodes/physical/stream/StreamExecRank.scala     |   7 +-
 .../nodes/physical/stream/StreamExecSink.scala     |  20 --
 .../nodes/physical/stream/StreamExecSort.scala     |   7 +-
 .../physical/stream/StreamExecSortLimit.scala      |   7 +-
 .../stream/StreamExecTableSourceScan.scala         |   3 -
 .../physical/stream/StreamExecTemporalJoin.scala   |   7 +-
 .../physical/stream/StreamExecTemporalSort.scala   |   9 +-
 .../nodes/physical/stream/StreamExecValues.scala   |   6 +-
 .../physical/stream/StreamExecWindowJoin.scala     |   7 +-
 .../planner/plan/utils/ExecNodePlanDumper.scala    |   4 -
 .../plan/nodes/resource/MockNodeTestBase.java      | 185 ------------
 .../parallelism/FinalParallelismSetterTest.java    | 118 --------
 .../parallelism/ShuffleStageGeneratorTest.java     | 323 ---------------------
 .../ShuffleStageParallelismCalculatorTest.java     | 123 --------
 .../resources/explain/testGetStatsFromCatalog.out  |  14 +-
 .../plan/nodes/resource/ExecNodeResourceTest.xml   | 153 ----------
 .../apache/flink/table/api/batch/ExplainTest.scala |   8 +-
 .../flink/table/api/stream/ExplainTest.scala       |   8 +-
 .../plan/nodes/resource/ExecNodeResourceTest.scala | 251 ----------------
 68 files changed, 174 insertions(+), 1988 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 4abbb38..1c6c8ab 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -83,18 +83,13 @@ public class ExecutionConfigOptions {
 	public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM =
 		key("table.exec.resource.default-parallelism")
 			.defaultValue(-1)
-			.withDescription("Default parallelism of job operators. If it is <= 0, use parallelism of StreamExecutionEnvironment(" +
-				"its default value is the num of cpu cores in the client host).");
-
-	public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM =
-		key("table.exec.resource.source.parallelism")
-			.defaultValue(-1)
-			.withDescription("Sets source parallelism, if it is <= 0, use " + TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key() + " to set source parallelism.");
-
-	public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_SINK_PARALLELISM =
-		key("table.exec.resource.sink.parallelism")
-			.defaultValue(-1)
-			.withDescription("Sets sink parallelism, if it is <= 0, use " + TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key() + " to set sink parallelism.");
+			.withDescription("Sets default parallelism for all operators " +
+					"(such as aggregate, join, filter) to run with parallel instances. " +
+					"This config has a higher priority than parallelism of " +
+					"StreamExecutionEnvironment (actually, this config overrides the parallelism " +
+					"of StreamExecutionEnvironment). A value of -1 indicates that no " +
+					"default parallelism is set, then it will fallback to use the parallelism " +
+					"of StreamExecutionEnvironment.");
 
 	public static final ConfigOption<String> TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY =
 		key("table.exec.resource.external-buffer-memory")
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResource.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResource.java
deleted file mode 100644
index 9b1826f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource;
-
-/**
- * Resource for node: parallelism. And other resource latitudes needed to add.
- */
-public class NodeResource {
-
-	// node parallelism
-	private int parallelism = -1;
-
-	private int maxParallelism = -1;
-
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-	}
-
-	public int getMaxParallelism() {
-		return maxParallelism;
-	}
-
-	public void setMaxParallelism(int maxParallelism) {
-		this.maxParallelism = maxParallelism;
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder("{");
-		sb.append("parallelism=").append(parallelism);
-		if (maxParallelism > 0) {
-			sb.append(", maxParallelism=").append(maxParallelism);
-		}
-		sb.append("}");
-		return sb.toString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java
index 1d936b6..b5e1925 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/NodeResourceUtil.java
@@ -19,8 +19,6 @@
 package org.apache.flink.table.planner.plan.nodes.resource;
 
 import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 /**
  * Deal with resource config for {@link org.apache.flink.table.planner.plan.nodes.exec.ExecNode}.
@@ -33,42 +31,8 @@ public class NodeResourceUtil {
 	public static final long SIZE_IN_MB =  1024L * 1024;
 
 	/**
-	 * Gets the config parallelism for source.
-	 * @param tableConf Configuration.
-	 * @return the config parallelism for source.
+	 * Build resourceSpec from managedMem.
 	 */
-	public static int getSourceParallelism(Configuration tableConf, int envParallelism) {
-		int parallelism = tableConf.getInteger(
-				ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM);
-		if (parallelism <= 0) {
-			parallelism = getOperatorDefaultParallelism(tableConf, envParallelism);
-		}
-		return parallelism;
-	}
-
-	/**
-	 * Gets the config parallelism for sink. If it is not set, return -1.
-	 * @param tableConf Configuration.
-	 * @return the config parallelism for sink.
-	 */
-	public static int getSinkParallelism(Configuration tableConf) {
-		return tableConf.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM);
-	}
-
-	/**
-	 * Gets default parallelism of operator.
-	 * @param tableConf Configuration.
-	 * @return default parallelism of operator.
-	 */
-	public static int getOperatorDefaultParallelism(Configuration tableConf, int envParallelism) {
-		int parallelism = tableConf.getInteger(
-				ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
-		if (parallelism <= 0) {
-			parallelism = envParallelism;
-		}
-		return parallelism;
-	}
-
 	public static ResourceSpec fromManagedMem(int managedMem) {
 		ResourceSpec.Builder builder = ResourceSpec.newBuilder();
 		builder.setManagedMemoryInMB(managedMem);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetter.java
deleted file mode 100644
index 1f45bec..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetter.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.Values;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Set final parallelism if needed at the beginning time, if parallelism of a node is set to be final,
- * it will not be changed by other parallelism calculator.
- */
-public class FinalParallelismSetter {
-
-	private final StreamExecutionEnvironment env;
-	private Set<ExecNode<?, ?>> calculatedNodeSet = new HashSet<>();
-	private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = new HashMap<>();
-
-	private FinalParallelismSetter(StreamExecutionEnvironment env) {
-		this.env = env;
-	}
-
-	/**
-	 * Finding nodes that need to set final parallelism.
-	 */
-	public static Map<ExecNode<?, ?>, Integer> calculate(StreamExecutionEnvironment env, List<ExecNode<?, ?>> sinkNodes) {
-		FinalParallelismSetter setter = new FinalParallelismSetter(env);
-		sinkNodes.forEach(setter::calculate);
-		return setter.finalParallelismNodeMap;
-	}
-
-	private void calculate(ExecNode<?, ?> execNode) {
-		if (!calculatedNodeSet.add(execNode)) {
-			return;
-		}
-		if (execNode instanceof BatchExecTableSourceScan) {
-			calculateTableSource((BatchExecTableSourceScan) execNode);
-		} else if (execNode instanceof StreamExecTableSourceScan) {
-			calculateTableSource((StreamExecTableSourceScan) execNode);
-		} else if (execNode instanceof BatchExecBoundedStreamScan) {
-			calculateBoundedStreamScan((BatchExecBoundedStreamScan) execNode);
-		} else if (execNode instanceof StreamExecDataStreamScan) {
-			calculateDataStreamScan((StreamExecDataStreamScan) execNode);
-		} else if (execNode instanceof Values) {
-			calculateValues(execNode);
-		} else {
-			calculateIfSingleton(execNode);
-		}
-	}
-
-	private void calculateTableSource(BatchExecTableSourceScan tableSourceScan) {
-		Transformation transformation = tableSourceScan.getSourceTransformation(env);
-		if (transformation.getMaxParallelism() > 0) {
-			tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism());
-		}
-	}
-
-	private void calculateTableSource(StreamExecTableSourceScan tableSourceScan) {
-		Transformation transformation = tableSourceScan.getSourceTransformation(env);
-		if (transformation.getMaxParallelism() > 0) {
-			tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism());
-		}
-	}
-
-	private void calculateBoundedStreamScan(BatchExecBoundedStreamScan boundedStreamScan) {
-		Transformation transformation = boundedStreamScan.getSourceTransformation();
-		int parallelism = transformation.getParallelism();
-		if (parallelism <= 0) {
-			parallelism = env.getParallelism();
-		}
-		finalParallelismNodeMap.put(boundedStreamScan, parallelism);
-	}
-
-	private void calculateDataStreamScan(StreamExecDataStreamScan dataStreamScan) {
-		Transformation transformation = dataStreamScan.getSourceTransformation();
-		int parallelism = transformation.getParallelism();
-		if (parallelism <= 0) {
-			parallelism = env.getParallelism();
-		}
-		finalParallelismNodeMap.put(dataStreamScan, parallelism);
-	}
-
-	private void calculateIfSingleton(ExecNode<?, ?> execNode) {
-		calculateInputs(execNode);
-		for (ExecNode<?, ?> inputNode : execNode.getInputNodes()) {
-			if (inputNode instanceof Exchange &&
-					((Exchange) inputNode).getDistribution().getType() == RelDistribution.Type.SINGLETON) {
-				// set parallelism as 1 to GlobalAggregate and other global node.
-				finalParallelismNodeMap.put(execNode, 1);
-				execNode.getResource().setMaxParallelism(1);
-				return;
-			}
-		}
-	}
-
-	private void calculateValues(ExecNode<?, ?> values) {
-		finalParallelismNodeMap.put(values, 1);
-		values.getResource().setMaxParallelism(1);
-	}
-
-	private void calculateInputs(ExecNode<?, ?> node) {
-		node.getInputNodes().forEach(this::calculate);
-	}
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ParallelismProcessor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ParallelismProcessor.java
deleted file mode 100644
index ebd2275..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ParallelismProcessor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink;
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Processor for calculating parallelism for {@link ExecNode} dag.
- */
-public class ParallelismProcessor implements DAGProcessor {
-
-	@Override
-	public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
-		PlannerBase planner = context.getPlanner();
-		List<ExecNode<?, ?>> rootNodes = filterSinkNodes(sinkNodes);
-		// find exec nodes whose parallelism cannot be changed.
-		Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap =
-			FinalParallelismSetter.calculate(planner.getExecEnv(), rootNodes);
-		// generate shuffleStages that bind adjacent exec nodes together whose parallelism can be the same.
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
-			ShuffleStageGenerator.generate(rootNodes, nodeToFinalParallelismMap);
-		// calculate parallelism of shuffleStages.
-		ShuffleStageParallelismCalculator.calculate(
-			planner.getTableConfig().getConfiguration(), planner.getExecEnv().getParallelism(), nodeShuffleStageMap.values());
-		for (ExecNode<?, ?> node : nodeShuffleStageMap.keySet()) {
-			node.getResource().setParallelism(nodeShuffleStageMap.get(node).getParallelism());
-		}
-		return sinkNodes;
-	}
-
-	/**
-	 * Filter sink nodes because parallelism of sink nodes is calculated after translateToPlan, as
-	 * transformations generated by {@link BatchExecSink} or {@link StreamExecSink} have too many
-	 * uncertainty factors. Filtering here can let later process easier.
-	 */
-	private List<ExecNode<?, ?>> filterSinkNodes(List<ExecNode<?, ?>> sinkNodes) {
-		List<ExecNode<?, ?>> rootNodes = new ArrayList<>();
-		sinkNodes.forEach(s -> {
-			if (s instanceof Sink) {
-				rootNodes.add(s.getInputNodes().get(0));
-			} else {
-				rootNodes.add(s);
-			}
-		});
-		return rootNodes;
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStage.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStage.java
deleted file mode 100644
index b9b5bf5..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStage.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-/**
- * There are no shuffle when transferring data in a shuffleStage.
- */
-public class ShuffleStage {
-
-	private final Set<ExecNode<?, ?>> execNodeSet = new LinkedHashSet<>();
-
-	// parallelism of this shuffleStage.
-	private int parallelism = -1;
-
-	private int maxParallelism = Integer.MAX_VALUE;
-
-	// whether this parallelism is final, if it is final, it can not be changed.
-	private boolean isFinalParallelism = false;
-
-	public void addNode(ExecNode<?, ?> node) {
-		execNodeSet.add(node);
-		if (node.getResource().getMaxParallelism() > 0 &&
-				node.getResource().getMaxParallelism() < maxParallelism) {
-			maxParallelism = node.getResource().getMaxParallelism();
-		}
-	}
-
-	public int getMaxParallelism() {
-		return maxParallelism;
-	}
-
-	public void addNodeSet(Set<ExecNode<?, ?>> nodeSet) {
-		nodeSet.forEach(this::addNode);
-	}
-
-	public void removeNode(ExecNode<?, ?> node) {
-		this.execNodeSet.remove(node);
-	}
-
-	public Set<ExecNode<?, ?>> getExecNodeSet() {
-		return this.execNodeSet;
-	}
-
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	public void setParallelism(int parallelism, boolean finalParallelism) {
-		if (this.isFinalParallelism) {
-			if (finalParallelism && this.parallelism != parallelism) {
-				throw new IllegalArgumentException("both fixed parallelism are not equal, old: " + this.parallelism + ", new: " + parallelism);
-			}
-		} else {
-			if (finalParallelism) {
-				this.parallelism = parallelism;
-				this.isFinalParallelism = true;
-			} else {
-				this.parallelism = Math.max(this.parallelism, parallelism);
-			}
-		}
-	}
-
-	public boolean isFinalParallelism() {
-		return isFinalParallelism;
-	}
-
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGenerator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGenerator.java
deleted file mode 100644
index 8de5876..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGenerator.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
-
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.core.Exchange;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * Build exec nodes to shuffleStages according to {@link BatchExecExchange}.
- * If there is data shuffle between two adjacent exec nodes,
- * they are belong to different shuffleStages.
- * If there is no data shuffle between two adjacent exec nodes, but
- * they have different final parallelism, they are also belong to different shuffleStages.
- */
-public class ShuffleStageGenerator {
-
-	private final Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = new LinkedHashMap<>();
-	private final Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap;
-
-	private ShuffleStageGenerator(Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap) {
-		this.nodeToFinalParallelismMap = nodeToFinalParallelismMap;
-	}
-
-	public static Map<ExecNode<?, ?>, ShuffleStage> generate(List<ExecNode<?, ?>> sinkNodes, Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap) {
-		ShuffleStageGenerator generator = new ShuffleStageGenerator(finalParallelismNodeMap);
-		sinkNodes.forEach(generator::buildShuffleStages);
-		Map<ExecNode<?, ?>, ShuffleStage> result = generator.getNodeShuffleStageMap();
-		result.values().forEach(s -> {
-			List<ExecNode<?, ?>> virtualNodeList = s.getExecNodeSet().stream().filter(ShuffleStageGenerator::isVirtualNode).collect(toList());
-			virtualNodeList.forEach(s::removeNode);
-		});
-		return generator.getNodeShuffleStageMap().entrySet().stream()
-				.filter(x -> !isVirtualNode(x.getKey()))
-				.collect(Collectors.toMap(Map.Entry::getKey,
-						Map.Entry::getValue,
-						(e1, e2) -> e1,
-						LinkedHashMap::new));
-	}
-
-	private void buildShuffleStages(ExecNode<?, ?> execNode) {
-		if (nodeShuffleStageMap.containsKey(execNode)) {
-			return;
-		}
-		for (ExecNode<?, ?> input : execNode.getInputNodes()) {
-			buildShuffleStages((input));
-		}
-
-		if (execNode.getInputNodes().isEmpty()) {
-			// source node
-			ShuffleStage shuffleStage = new ShuffleStage();
-			shuffleStage.addNode(execNode);
-			if (nodeToFinalParallelismMap.containsKey(execNode)) {
-				shuffleStage.setParallelism(nodeToFinalParallelismMap.get(execNode), true);
-			}
-			nodeShuffleStageMap.put(execNode, shuffleStage);
-		} else if (execNode instanceof Exchange && !isRangeExchange((Exchange) execNode)) {
-				// do nothing.
-		} else {
-			Set<ShuffleStage> inputShuffleStages = getInputShuffleStages(execNode);
-			Integer parallelism = nodeToFinalParallelismMap.get(execNode);
-			ShuffleStage inputShuffleStage = mergeInputShuffleStages(inputShuffleStages, parallelism);
-			inputShuffleStage.addNode(execNode);
-			nodeShuffleStageMap.put(execNode, inputShuffleStage);
-		}
-	}
-
-	private boolean isRangeExchange(Exchange exchange) {
-		return exchange.getDistribution().getType() == RelDistribution.Type.RANGE_DISTRIBUTED;
-	}
-
-	private ShuffleStage mergeInputShuffleStages(Set<ShuffleStage> shuffleStageSet, Integer parallelism) {
-		if (parallelism != null) {
-			ShuffleStage resultShuffleStage = new ShuffleStage();
-			resultShuffleStage.setParallelism(parallelism, true);
-			for (ShuffleStage shuffleStage : shuffleStageSet) {
-				//consider max parallelism.
-				if ((shuffleStage.isFinalParallelism() && shuffleStage.getParallelism() == parallelism)
-					|| (!shuffleStage.isFinalParallelism() && shuffleStage.getMaxParallelism() >= parallelism)) {
-					mergeShuffleStage(resultShuffleStage, shuffleStage);
-				}
-			}
-			return resultShuffleStage;
-		} else {
-			ShuffleStage resultShuffleStage = shuffleStageSet.stream()
-					.filter(ShuffleStage::isFinalParallelism)
-					.max(Comparator.comparing(ShuffleStage::getParallelism))
-					.orElse(new ShuffleStage());
-			for (ShuffleStage shuffleStage : shuffleStageSet) {
-				//consider max parallelism.
-				if ((shuffleStage.isFinalParallelism() && shuffleStage.getParallelism() == resultShuffleStage.getParallelism())
-						|| (!shuffleStage.isFinalParallelism() && shuffleStage.getMaxParallelism() >= resultShuffleStage.getParallelism())) {
-					mergeShuffleStage(resultShuffleStage, shuffleStage);
-				}
-			}
-			return resultShuffleStage;
-		}
-	}
-
-	private void mergeShuffleStage(ShuffleStage shuffleStage, ShuffleStage other) {
-		Set<ExecNode<?, ?>> nodeSet = other.getExecNodeSet();
-		shuffleStage.addNodeSet(nodeSet);
-		for (ExecNode<?, ?> r : nodeSet) {
-			nodeShuffleStageMap.put(r, shuffleStage);
-		}
-	}
-
-	private Set<ShuffleStage> getInputShuffleStages(ExecNode<?, ?> node) {
-		Set<ShuffleStage> shuffleStageList = new HashSet<>();
-		for (ExecNode<?, ?> input : node.getInputNodes()) {
-			ShuffleStage oneInputShuffleStage = nodeShuffleStageMap.get(input);
-			if (oneInputShuffleStage != null) {
-				shuffleStageList.add(oneInputShuffleStage);
-			}
-		}
-		return shuffleStageList;
-	}
-
-	private static boolean isVirtualNode(ExecNode<?, ?> node) {
-		return node instanceof BatchExecUnion || node instanceof StreamExecUnion;
-	}
-
-	private Map<ExecNode<?, ?>, ShuffleStage> getNodeShuffleStageMap() {
-		return nodeShuffleStageMap;
-	}
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
deleted file mode 100644
index 4d96e7e..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Parallelism calculator for shuffleStages.
- */
-public class ShuffleStageParallelismCalculator {
-	private static final Logger LOG = LoggerFactory.getLogger(ShuffleStageParallelismCalculator.class);
-	private final Configuration tableConf;
-	private final int envParallelism;
-
-	private ShuffleStageParallelismCalculator(Configuration tableConf, int envParallelism) {
-		this.tableConf = tableConf;
-		this.envParallelism = envParallelism;
-	}
-
-	public static void calculate(Configuration tableConf, int envParallelism, Collection<ShuffleStage> shuffleStages) {
-		new ShuffleStageParallelismCalculator(tableConf, envParallelism).calculate(shuffleStages);
-	}
-
-	private void calculate(Collection<ShuffleStage> shuffleStages) {
-		Set<ShuffleStage> shuffleStageSet = new HashSet<>(shuffleStages);
-		shuffleStageSet.forEach(this::calculate);
-	}
-
-	/**
-	 * If there are source nodes in a shuffleStage, its parallelism is the max parallelism of source
-	 * nodes. Otherwise, its parallelism is the default operator parallelism.
-	 */
-	@VisibleForTesting
-	protected void calculate(ShuffleStage shuffleStage) {
-		if (shuffleStage.isFinalParallelism()) {
-			return;
-		}
-		Set<ExecNode<?, ?>> nodeSet = shuffleStage.getExecNodeSet();
-		int sourceParallelism = -1;
-		int maxParallelism = shuffleStage.getMaxParallelism();
-		for (ExecNode<?, ?> node : nodeSet) {
-			if (node instanceof BatchExecTableSourceScan || node instanceof StreamExecTableSourceScan) {
-				int result = NodeResourceUtil.getSourceParallelism(tableConf, envParallelism);
-				if (result > sourceParallelism) {
-					sourceParallelism = result;
-				}
-			}
-		}
-		int shuffleStageParallelism;
-		if (sourceParallelism > 0) {
-			shuffleStageParallelism = sourceParallelism;
-		} else {
-			shuffleStageParallelism = NodeResourceUtil.getOperatorDefaultParallelism(getTableConf(), envParallelism);
-		}
-		if (shuffleStageParallelism > maxParallelism) {
-			shuffleStageParallelism = maxParallelism;
-		}
-		shuffleStage.setParallelism(shuffleStageParallelism, false);
-	}
-
-	private Configuration getTableConf() {
-		return this.tableConf;
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 8d43bc9..213be6a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOpera
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
-import org.apache.flink.table.planner.plan.nodes.resource.parallelism.ParallelismProcessor
 import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
 import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor
 import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
@@ -61,13 +60,12 @@ class BatchPlanner(
     val execNodePlan = super.translateToExecNodePlan(optimizedRelNodes)
     val context = new DAGProcessContext(this)
     // breakup deadlock
-    val postNodeDag = new DeadlockBreakupProcessor().process(execNodePlan, context)
-    // set parallelism
-    new ParallelismProcessor().process(postNodeDag, context)
+    new DeadlockBreakupProcessor().process(execNodePlan, context)
   }
 
   override protected def translateToPlan(
       execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
+    overrideEnvParallelism()
     execNodes.map {
       case node: BatchExecNode[_] => node.translateToPlan(this)
       case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 2b0b484..18922e8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, CatalogTable, ConnectorCatalogTable, FunctionCatalog, ObjectPath}
 import org.apache.flink.table.delegation.{Executor, Planner}
@@ -148,6 +149,15 @@ abstract class PlannerBase(
     translateToPlan(execNodes)
   }
 
+  protected def overrideEnvParallelism(): Unit = {
+    // Use config parallelism to override env parallelism.
+    val defaultParallelism = getTableConfig.getConfiguration.getInteger(
+      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
+    if (defaultParallelism > 0) {
+      getExecEnv.setParallelism(defaultParallelism)
+    }
+  }
+
   override def getCompletionHints(statement: String, position: Int): Array[String] = {
     val planner = getFlinkPlanner
     planner.getCompletionHints(statement, position)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 6b01362..091083a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -25,14 +25,11 @@ import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
 import org.apache.flink.table.planner.plan.`trait`._
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
-import org.apache.flink.table.planner.plan.nodes.resource.parallelism.ParallelismProcessor
 import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
 import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.planner.utils.PlanUtil
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
-import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.SqlExplainLevel
 
 import java.util
@@ -57,15 +54,9 @@ class StreamPlanner(
 
   override protected def getOptimizer: Optimizer = new StreamCommonSubGraphBasedOptimizer(this)
 
-  override private[flink] def translateToExecNodePlan(
-      optimizedRelNodes: Seq[RelNode]): util.List[ExecNode[_, _]] = {
-    val execNodePlan = super.translateToExecNodePlan(optimizedRelNodes)
-    val context = new DAGProcessContext(this)
-    new ParallelismProcessor().process(execNodePlan, context)
-  }
-
   override protected def translateToPlan(
       execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
+    overrideEnvParallelism()
     execNodes.map {
       case node: StreamExecNode[_] => node.translateToPlan(this)
       case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala
index 50b5035..edcca5b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.scala
@@ -21,10 +21,14 @@ package org.apache.flink.table.planner.plan.nodes.exec
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.delegation.Planner
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResource
+
+import org.apache.calcite.rel.RelDistribution
+import org.apache.calcite.rel.core.Exchange
 
 import java.util
 
+import scala.collection.JavaConversions._
+
 /**
   * The representation of execution information for a [[FlinkPhysicalRel]].
   *
@@ -34,21 +38,11 @@ import java.util
 trait ExecNode[E <: Planner, T] {
 
   /**
-    * Defines how much resource the node will take.
-    */
-  private val resource: NodeResource = new NodeResource
-
-  /**
     * The [[Transformation]] translated from this node.
     */
   private var transformation: Transformation[T] = _
 
   /**
-    * Get node resource.
-    */
-  def getResource: NodeResource = resource
-
-  /**
     * Translates this node into a Flink operator.
     *
     * <p>NOTE: returns same translate result if called multiple times.
@@ -94,4 +88,14 @@ trait ExecNode[E <: Planner, T] {
   def accept(visitor: ExecNodeVisitor): Unit = {
     visitor.visit(this)
   }
+
+  /**
+    *  Whether there is singleton exchange node as input.
+    */
+  protected def inputsContainSingleton(): Boolean = {
+    getInputNodes.exists { node =>
+      node.isInstanceOf[Exchange] &&
+          node.asInstanceOf[Exchange].getDistribution.getType == RelDistribution.Type.SINGLETON
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index e7de711..5779d72 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -87,9 +87,8 @@ class BatchExecBoundedStreamScan(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
     val batchTransform = boundedStreamTable.dataStream.getTransformation
-    batchTransform.setParallelism(getResource.getParallelism)
     if (needInternalConversion) {
-      val conversionTransform = ScanUtil.convertToInternalRow(
+      ScanUtil.convertToInternalRow(
         CodeGeneratorContext(config),
         batchTransform,
         boundedStreamTable.fieldIndexes,
@@ -98,16 +97,11 @@ class BatchExecBoundedStreamScan(
         getTable.getQualifiedName,
         config,
         None)
-      conversionTransform.setParallelism(getResource.getParallelism)
-      conversionTransform
     } else {
       batchTransform.asInstanceOf[Transformation[BaseRow]]
     }
   }
 
-  def getSourceTransformation: Transformation[_] =
-    boundedStreamTable.dataStream.getTransformation
-
   def needInternalConversion: Boolean = {
     ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
         ScanUtil.needsConversion(boundedStreamTable.dataType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
index 3528412..3d03932 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -161,6 +161,6 @@ class BatchExecCalc(
       RelExplainUtil.calcToString(calcProgram, getExpressionString),
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      inputTransform.getParallelism)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
index a20535d..b11a7a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -198,7 +198,7 @@ class BatchExecCorrelate(
       condition,
       outputRowType,
       joinType,
-      getResource.getParallelism,
+      inputTransformation.getParallelism,
       retainHeader = false,
       getExpressionString,
       "BatchExecCorrelate")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
index b82ead7..0cb7a7b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExchange.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.operators.DamBehavior
@@ -144,6 +145,7 @@ class BatchExecExchange(
           null,
           shuffleMode)
         transformation.setOutputType(outputRowType)
+        transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
         transformation
 
       case RelDistribution.Type.SINGLETON =>
@@ -152,6 +154,7 @@ class BatchExecExchange(
           new GlobalPartitioner[BaseRow],
           shuffleMode)
         transformation.setOutputType(outputRowType)
+        transformation.setParallelism(1)
         transformation
 
       case RelDistribution.Type.RANDOM_DISTRIBUTED =>
@@ -160,6 +163,7 @@ class BatchExecExchange(
           new RebalancePartitioner[BaseRow],
           shuffleMode)
         transformation.setOutputType(outputRowType)
+        transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
         transformation
 
       case RelDistribution.Type.BROADCAST_DISTRIBUTED =>
@@ -168,6 +172,7 @@ class BatchExecExchange(
           new BroadcastPartitioner[BaseRow],
           shuffleMode)
         transformation.setOutputType(outputRowType)
+        transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
         transformation
 
       case RelDistribution.Type.HASH_DISTRIBUTED =>
@@ -186,6 +191,7 @@ class BatchExecExchange(
           partitioner,
           shuffleMode)
         transformation.setOutputType(outputRowType)
+        transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
         transformation
       case _ =>
         throw new UnsupportedOperationException(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
index e5441b6..e6a9c8e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
@@ -105,7 +105,7 @@ class BatchExecExpand(
       operatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      inputTransform.getParallelism)
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index cf8d0ee..46e8efd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -150,7 +150,7 @@ abstract class BatchExecHashAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      input.getParallelism)
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     ret.setResources(resource, resource)
     ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
index 94050ff..e029b0f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -268,7 +268,7 @@ class BatchExecHashJoin(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
-      getResource.getParallelism)
+      probe.getParallelism)
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     ret.setResources(resource, resource)
     ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 0d3dd07..6074157 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -152,7 +152,7 @@ abstract class BatchExecHashWindowAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      input.getParallelism)
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     ret.setResources(resource, resource)
     ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
index 3e14fa3..6b4ee94 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
@@ -111,7 +111,7 @@ class BatchExecLimit(
       getOperatorName,
       operator,
       inputType,
-      getResource.getParallelism)
+      input.getParallelism)
   }
 
   private def getOperatorName = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
index 89c631f..588c022 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLookupJoin.scala
@@ -89,12 +89,10 @@ class BatchExecLookupJoin(
     planner: BatchPlanner): Transformation[BaseRow] = {
     val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
       .asInstanceOf[Transformation[BaseRow]]
-    val transformation = translateToPlanInternal(
+    translateToPlanInternal(
       inputTransformation,
       planner.getExecEnv,
       planner.getTableConfig,
       planner.getRelBuilder)
-    transformation.setParallelism(getResource.getParallelism)
-    transformation
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index 27e3629..db30712 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -161,13 +161,14 @@ class BatchExecNestedLoopJoin(
     }
     val resourceSpec = NodeResourceUtil.fromManagedMem(externalBufferMemoryInMb)
 
+    val parallelism = if (leftIsBuild) rInput.getParallelism else lInput.getParallelism
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       lInput,
       rInput,
       getOperatorName,
       op,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      parallelism)
     ret.setResources(resourceSpec, resourceSpec)
     ret
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index 90dde6f..abe140a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -421,7 +421,7 @@ class BatchExecOverAggregate(
     }
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     val ret = new OneInputTransformation(
-      input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), getResource.getParallelism)
+      input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), input.getParallelism)
     ret.setResources(resource, resource)
     ret
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
index 7ecfed4..52d237f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
@@ -291,7 +291,7 @@ class BatchExecRank(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      input.getParallelism)
   }
 
   private def getOperatorName: String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
index be8ac6f..2b4e181 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
@@ -29,7 +29,6 @@ import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContex
 import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -98,23 +97,7 @@ class BatchExecSink[T](
             "implemented and return the sink transformation DataStreamSink. " +
             s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.")
         }
-        val sinkTransformation = dsSink.getTransformation
-
-        val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
-          planner.getTableConfig.getConfiguration)
-
-        val maxSinkParallelism = sinkTransformation.getMaxParallelism
-
-        // only set user's parallelism when user defines a sink parallelism
-        if (configSinkParallelism > 0) {
-          // set the parallelism when user's parallelism is not larger than max parallelism
-          // or max parallelism is not set
-          if (maxSinkParallelism < 0 || configSinkParallelism <= maxSinkParallelism) {
-            sinkTransformation.setParallelism(configSinkParallelism)
-          }
-        }
-
-        sinkTransformation
+        dsSink.getTransformation
 
       case dsTableSink: DataStreamTableSink[T] =>
         // In case of table to bounded stream through Batchplannerironment#toBoundedStream, we
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
index 7d0226a..66df094 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
@@ -131,7 +131,7 @@ class BatchExecSort(
       s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
       operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      input.getParallelism)
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     ret.setResources(resource, resource)
     ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
index 8ee73ce..80f56f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
@@ -125,6 +125,6 @@ abstract class BatchExecSortAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      input.getParallelism)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
index 578dd2a..5426fb8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
@@ -146,7 +146,7 @@ class BatchExecSortLimit(
       getOperatorName,
       operator,
       inputType,
-      getResource.getParallelism)
+      input.getParallelism)
   }
 
   private def getOperatorName = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index d4f9d22..c15b6f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -268,7 +268,7 @@ class BatchExecSortMergeJoin(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
-      getResource.getParallelism)
+      rightInput.getParallelism)
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     ret.setResources(resource, resource)
     ret
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 52707f1..8c621d5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -140,6 +140,6 @@ abstract class BatchExecSortWindowAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      input.getParallelism)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 11a1dc5..99f1b7c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -84,7 +84,6 @@ class BatchExecTableSourceScan(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
     val inputTransform = getSourceTransformation(planner.getExecEnv)
-    inputTransform.setParallelism(getResource.getParallelism)
 
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
@@ -110,7 +109,7 @@ class BatchExecTableSourceScan(
       planner.getRelBuilder
     )
     if (needInternalConversion) {
-      val conversionTransform = ScanUtil.convertToInternalRow(
+      ScanUtil.convertToInternalRow(
         CodeGeneratorContext(config),
         inputTransform.asInstanceOf[Transformation[Any]],
         fieldIndexes,
@@ -119,8 +118,6 @@ class BatchExecTableSourceScan(
         getTable.getQualifiedName,
         config,
         rowtimeExpression)
-      conversionTransform.setParallelism(getResource.getParallelism)
-      conversionTransform
     } else {
       inputTransform.asInstanceOf[Transformation[BaseRow]]
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
index 3c126b7..e1e13ea 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
@@ -80,7 +80,7 @@ class BatchExecValues(
       getRelTypeName)
     val transformation = planner.getExecEnv.createInput(inputFormat,
       inputFormat.getProducedType).getTransformation
-    transformation.setParallelism(getResource.getParallelism)
+    transformation.setParallelism(1)
     transformation
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
index 182fc8e..a249436 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
@@ -122,10 +122,11 @@ class StreamExecCalc(
       RelExplainUtil.calcToString(calcProgram, getExpressionString),
       substituteStreamOperator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
+      inputTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
     ret
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
index 0fe3922..ae5788a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -131,12 +131,13 @@ class StreamExecCorrelate(
       condition,
       outputRowType,
       joinType,
-      getResource.getParallelism,
+      inputTransformation.getParallelism,
       retainHeader = true,
       getExpressionString,
       "StreamExecCorrelate")
-    if (getResource.getMaxParallelism > 0) {
-      transform.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      transform.setParallelism(1)
+      transform.setMaxParallelism(1)
     }
     transform
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
index 3306033..ef06da4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
@@ -77,9 +77,6 @@ class StreamExecDataStreamScan(
 
   override def deriveRowType(): RelDataType = outputRowType
 
-  def getSourceTransformation: Transformation[_] =
-    dataStreamTable.dataStream.getTransformation
-
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new StreamExecDataStreamScan(cluster, traitSet, getTable, getRowType)
   }
@@ -110,7 +107,6 @@ class StreamExecDataStreamScan(
     val config = planner.getTableConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     val transform = inputDataStream.getTransformation
-    transform.setParallelism(getResource.getParallelism)
 
     val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder)
 
@@ -127,7 +123,7 @@ class StreamExecDataStreamScan(
         }
       val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
         classOf[AbstractProcessStreamOperator[BaseRow]])
-      val ret = ScanUtil.convertToInternalRow(
+      ScanUtil.convertToInternalRow(
         ctx,
         transform,
         dataStreamTable.fieldIndexes,
@@ -138,8 +134,6 @@ class StreamExecDataStreamScan(
         rowtimeExpr,
         beforeConvert = extractElement,
         afterConvert = resetElement)
-      ret.setParallelism(getResource.getParallelism)
-      ret
     } else {
       transform.asInstanceOf[Transformation[BaseRow]]
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
index 251d137..888ee6e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
@@ -144,10 +144,11 @@ class StreamExecDeduplicate(
       getOperatorName,
       operator,
       rowTypeInfo,
-      getResource.getParallelism)
+      inputTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     val selector = KeySelectorUtil.getBaseRowSelector(uniqueKeys, rowTypeInfo)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala
index 652962b..fa75f62 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExchange.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
 import org.apache.flink.streaming.api.transformations.PartitionTransformation
@@ -92,6 +93,7 @@ class StreamExecExchange(
           inputTransform,
           partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
         transformation.setOutputType(outputTypeInfo)
+        transformation.setParallelism(1)
         transformation
       case RelDistribution.Type.HASH_DISTRIBUTED =>
         // TODO Eliminate duplicate keys
@@ -104,6 +106,7 @@ class StreamExecExchange(
           inputTransform,
           partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
         transformation.setOutputType(outputTypeInfo)
+        transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
         transformation
       case _ =>
         throw new UnsupportedOperationException(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
index 85f8b95..ec113a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
@@ -100,9 +100,10 @@ class StreamExecExpand(
       operatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getResource.getParallelism)
-    if (getResource.getMaxParallelism > 0) {
-      transform.setMaxParallelism(getResource.getMaxParallelism)
+      inputTransform.getParallelism)
+    if (inputsContainSingleton()) {
+      transform.setParallelism(1)
+      transform.setMaxParallelism(1)
     }
     transform
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index 9676ab9..ba27a5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -189,10 +189,11 @@ class StreamExecGlobalGroupAggregate(
       "GlobalGroupAggregate",
       operator,
       BaseRowTypeInfo.of(outRowType),
-      getResource.getParallelism)
+      inputTransformation.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 6f8b906..676d05a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -206,10 +206,11 @@ class StreamExecGroupAggregate(
       "GroupAggregate",
       operator,
       BaseRowTypeInfo.of(outRowType),
-      getResource.getParallelism)
+      inputTransformation.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
index d5322a6..212b01a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
@@ -231,10 +231,11 @@ class StreamExecGroupWindowAggregate(
       operatorName,
       operator,
       outRowType,
-      getResource.getParallelism)
+      inputTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      transformation.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      transformation.setParallelism(1)
+      transformation.setMaxParallelism(1)
     }
 
     val selector = KeySelectorUtil.getBaseRowSelector(grouping, inputRowTypeInfo)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index 4cef31f..ae52f71 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -186,10 +186,11 @@ class StreamExecIncrementalGroupAggregate(
       "IncrementalGroupAggregate",
       operator,
       BaseRowTypeInfo.of(outRowType),
-      getResource.getParallelism)
+      inputTransformation.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
index 22bc209..77a8e09 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
@@ -197,10 +197,11 @@ class StreamExecJoin(
       getJoinOperatorName(),
       operator,
       returnType,
-      getResource.getParallelism)
+      leftTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
index 34078c7..98b4a02 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
@@ -170,10 +170,11 @@ class StreamExecLimit(
       s"Limit(offset: $limitStart, fetch: ${fetchToString(fetch)})",
       operator,
       outputRowTypeInfo,
-      getResource.getParallelism)
+      inputTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     val selector = NullBinaryRowKeySelector.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
index 8a07486..c898881 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
@@ -149,10 +149,11 @@ class StreamExecLocalGroupAggregate(
       "LocalGroupAggregate",
       operator,
       BaseRowTypeInfo.of(outRowType),
-      getResource.getParallelism)
+      inputTransformation.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      transformation.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      transformation.setParallelism(1)
+      transformation.setMaxParallelism(1)
     }
 
     transformation
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala
index 617e603..740a863 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLookupJoin.scala
@@ -101,9 +101,9 @@ class StreamExecLookupJoin(
       planner.getExecEnv,
       planner.getTableConfig,
       planner.getRelBuilder)
-    transformation.setParallelism(getResource.getParallelism)
-    if (getResource.getMaxParallelism > 0) {
-      transformation.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      transformation.setParallelism(1)
+      transformation.setMaxParallelism(1)
     }
     transformation
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
index 167f0a1..6c07531 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
@@ -234,10 +234,11 @@ class StreamExecMatch(
         toString,
         operator,
         outputRowTypeInfo,
-        getResource.getParallelism
+        timestampedInput.getParallelism
       )
-      if (getResource.getMaxParallelism > 0) {
-        transformation.setMaxParallelism(getResource.getMaxParallelism)
+      if (inputsContainSingleton()) {
+        transformation.setParallelism(1)
+        transformation.setMaxParallelism(1)
       }
       setKeySelector(transformation, inputTypeInfo)
       transformation
@@ -293,9 +294,10 @@ class StreamExecMatch(
           s"rowtime field: ($timeOrderField)",
           new ProcessOperator(new RowtimeProcessFunction(timeIdx, inputTypeInfo)),
           inputTypeInfo,
-          getResource.getParallelism)
-        if (getResource.getMaxParallelism > 0) {
-          transformation.setMaxParallelism(getResource.getMaxParallelism)
+          inputTransform.getParallelism)
+        if (inputsContainSingleton()) {
+          transformation.setParallelism(1)
+          transformation.setMaxParallelism(1)
         }
         transformation
       } else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index 278edd4..c957110 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -285,10 +285,11 @@ class StreamExecOverAggregate(
       "OverAggregate",
       operator,
       returnTypeInfo,
-      getResource.getParallelism)
+      inputDS.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
index ebf1472..d227dd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
@@ -216,10 +216,11 @@ class StreamExecRank(
       rankOpName,
       operator,
       outputRowTypeInfo,
-      getResource.getParallelism)
+      inputTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index d505bf7..ea50bfe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -145,26 +145,6 @@ class StreamExecSink[T](
             "implemented and return the sink transformation DataStreamSink. " +
             s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.")
         }
-        val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
-          planner.getTableConfig.getConfiguration)
-
-        val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism
-
-        // only set user's parallelism when user defines a sink parallelism
-        if (configSinkParallelism > 0) {
-          // set the parallelism when user's parallelism is not larger than max parallelism or
-          // max parallelism is not set
-          if (maxSinkParallelism < 0 || configSinkParallelism <= maxSinkParallelism) {
-            dsSink.getTransformation.setParallelism(configSinkParallelism)
-          }
-        }
-        if (!UpdatingPlanChecker.isAppendOnly(this) &&
-            dsSink.getTransformation.getParallelism != transformation.getParallelism) {
-          throw new TableException(s"The configured sink parallelism should be equal to the" +
-              " input node when input is an update stream. The input parallelism is " +
-              "${transformation.getParallelism}, however the configured sink parallelism is " +
-              "${dsSink.getTransformation.getParallelism}.")
-        }
         dsSink.getTransformation
 
       case dsTableSink: DataStreamTableSink[_] =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
index 796491b5..3a7c927 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
@@ -135,9 +135,10 @@ class StreamExecSort(
       s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
       sortOperator,
       outputRowTypeInfo,
-      getResource.getParallelism)
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+      input.getParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
     ret
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
index 3ef6ac5..b34975b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
@@ -217,10 +217,11 @@ class StreamExecSortLimit(
       getOperatorName,
       operator,
       outputRowTypeInfo,
-      getResource.getParallelism)
+      inputTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     val selector = NullBinaryRowKeySelector.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index 38a3819..a4376fb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -96,7 +96,6 @@ class StreamExecTableSourceScan(
       planner: StreamPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
     val inputTransform = getSourceTransformation(planner.getExecEnv)
-    inputTransform.setParallelism(getResource.getParallelism)
 
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
@@ -143,7 +142,6 @@ class StreamExecTableSourceScan(
         rowtimeExpression,
         beforeConvert = extractElement,
         afterConvert = resetElement)
-      conversionTransform.setParallelism(getResource.getParallelism)
       conversionTransform
     } else {
       inputTransform.asInstanceOf[Transformation[BaseRow]]
@@ -174,7 +172,6 @@ class StreamExecTableSourceScan(
       // No need to generate watermarks if no rowtime attribute is specified.
       ingestedTable
     }
-    withWatermarks.getTransformation.setParallelism(getResource.getParallelism)
     withWatermarks.getTransformation
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
index 2acaba4..9002452 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
@@ -143,10 +143,11 @@ class StreamExecTemporalJoin(
       getJoinOperatorName,
       joinOperator,
       BaseRowTypeInfo.of(returnType),
-      getResource.getParallelism)
+      leftTransform.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
index b2c3e7f..030c483 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
@@ -150,7 +150,7 @@ class StreamExecTemporalSort(
         "ProcTimeSortOperator",
         sortOperator,
         outputRowTypeInfo,
-        getResource.getParallelism)
+        input.getParallelism)
 
       val selector = NullBinaryRowKeySelector.INSTANCE
       ret.setStateKeySelector(selector)
@@ -190,10 +190,11 @@ class StreamExecTemporalSort(
       "RowTimeSortOperator",
       sortOperator,
       outputRowTypeInfo,
-      getResource.getParallelism)
+      input.getParallelism)
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     val selector = NullBinaryRowKeySelector.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
index a958c6e..a95558b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
@@ -82,10 +82,8 @@ class StreamExecValues(
       getRelTypeName)
     val transformation = planner.getExecEnv.createInput(inputFormat,
       inputFormat.getProducedType).getTransformation
-    transformation.setParallelism(getResource.getParallelism)
-    if (getResource.getMaxParallelism > 0) {
-      transformation.setMaxParallelism(getResource.getMaxParallelism)
-    }
+    transformation.setParallelism(1)
+    transformation.setMaxParallelism(1)
     transformation
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
index 33f4aa7..d1127cc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
@@ -342,11 +342,12 @@ class StreamExecWindowJoin(
       new KeyedCoProcessOperatorWithWatermarkDelay(rowJoinFunc, rowJoinFunc.getMaxOutputDelay)
         .asInstanceOf[TwoInputStreamOperator[BaseRow,BaseRow,BaseRow]],
       returnTypeInfo,
-      getResource.getParallelism
+      leftPlan.getParallelism
     )
 
-    if (getResource.getMaxParallelism > 0) {
-      ret.setMaxParallelism(getResource.getMaxParallelism)
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
     }
 
     // set KeyType and Selector for state
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
index 31df785..750cb56 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
@@ -331,10 +331,6 @@ class NodeTreeWriterImpl(
         printValues.add(Pair.of("__id__", rel.getId.toString))
       }
 
-      if (withResource) {
-        printValues.add(Pair.of("resource", node.getResource))
-      }
-
       if (withRetractTraits) {
         rel match {
           case streamRel: StreamPhysicalRel =>
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/MockNodeTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/MockNodeTestBase.java
deleted file mode 100644
index c482426..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/MockNodeTestBase.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecValues;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecValues;
-
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Base for test with mock node list.
- */
-public class MockNodeTestBase {
-
-	protected List<ExecNode> nodeList;
-	private final boolean isBatchMode;
-
-	public MockNodeTestBase(boolean isBatchMode) {
-		this.isBatchMode = isBatchMode;
-	}
-
-	private void updateNode(int index, ExecNode<?, ?> node) {
-		nodeList.set(index, node);
-		NodeResource resource = new NodeResource();
-		when(node.getResource()).thenReturn(resource);
-		when(node.toString()).thenReturn("id: " + index);
-		if (node instanceof BatchExecTableSourceScan) {
-			Transformation transformation = mock(Transformation.class);
-			when(((BatchExecTableSourceScan) node).getSourceTransformation(any())).thenReturn(transformation);
-			when(transformation.getMaxParallelism()).thenReturn(-1);
-		} else if (node instanceof StreamExecTableSourceScan) {
-			Transformation transformation = mock(Transformation.class);
-			when(((StreamExecTableSourceScan) node).getSourceTransformation(any())).thenReturn(transformation);
-			when(transformation.getMaxParallelism()).thenReturn(-1);
-		} else if (node instanceof BatchExecBoundedStreamScan) {
-			Transformation transformation = mock(Transformation.class);
-			when(((BatchExecBoundedStreamScan) node).getSourceTransformation()).thenReturn(transformation);
-		} else if (node instanceof StreamExecDataStreamScan) {
-			Transformation transformation = mock(Transformation.class);
-			when(((StreamExecDataStreamScan) node).getSourceTransformation()).thenReturn(transformation);
-		} else if (node instanceof BatchExecExchange) {
-			RelDistribution distribution = mock(RelDistribution.class);
-			when(distribution.getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
-			when(((BatchExecExchange) node).getDistribution()).thenReturn(distribution);
-		} else if (node instanceof StreamExecExchange) {
-			RelDistribution distribution = mock(RelDistribution.class);
-			when(distribution.getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
-			when(((StreamExecExchange) node).getDistribution()).thenReturn(distribution);
-		}
-	}
-
-	protected ExecNode<?, ?> updateCalc(int index) {
-		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
-		updateNode(index, node);
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateValues(int index) {
-		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecValues.class) : mock(StreamExecValues.class);
-		updateNode(index, node);
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateUnion(int index) {
-		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecUnion.class) : mock(StreamExecUnion.class);
-		updateNode(index, node);
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateExchange(int index) {
-		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecExchange.class, RETURNS_DEEP_STUBS) :
-				mock(StreamExecExchange.class, RETURNS_DEEP_STUBS);
-		updateNode(index, node);
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateExchange(int index, RelDistribution.Type type) {
-		ExecNode<?, ?> node = updateExchange(index);
-		if (isBatchMode) {
-			when(((BatchExecExchange) node).getDistribution().getType()).thenReturn(type);
-		} else {
-			when(((StreamExecExchange) node).getDistribution().getType()).thenReturn(type);
-		}
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateTableSource(int index) {
-		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecTableSourceScan.class) : mock(StreamExecTableSourceScan.class);
-		updateNode(index, node);
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateTableSource(int index, int maxParallelism) {
-		ExecNode<?, ?> node = updateTableSource(index);
-		if (isBatchMode) {
-			when(((BatchExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
-		} else {
-			when(((StreamExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
-		}
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateStreamScan(int index) {
-		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecBoundedStreamScan.class) : mock(StreamExecDataStreamScan.class);
-		updateNode(index, node);
-		return node;
-	}
-
-	protected ExecNode<?, ?> updateStreamScan(int index, int parallelism) {
-		ExecNode<?, ?> node = updateStreamScan(index);
-		if (isBatchMode) {
-			when(((BatchExecBoundedStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
-		} else {
-			when(((StreamExecDataStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
-		}
-		return node;
-	}
-
-	protected void createNodeList(int num) {
-		nodeList = new LinkedList<>();
-		for (int i = 0; i < num; i++) {
-			ExecNode<?, ?>  node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
-			when(node.getInputNodes()).thenReturn(new ArrayList<>());
-			when(node.getResource()).thenReturn(new NodeResource());
-			when(node.toString()).thenReturn("id: " + i);
-			nodeList.add(node);
-		}
-	}
-
-	protected void connect(int nodeIndex, int... inputNodeIndexes) {
-		List<ExecNode<?, ?>> inputNodes = new ArrayList<>(inputNodeIndexes.length);
-		for (int inputIndex : inputNodeIndexes) {
-			ExecNode<?, ?> input = nodeList.get(inputIndex);
-			inputNodes.add(input);
-		}
-		when(nodeList.get(nodeIndex).getInputNodes()).thenReturn(inputNodes);
-		if (inputNodeIndexes.length == 1 && nodeList.get(nodeIndex) instanceof SingleRel) {
-			when(((SingleRel) nodeList.get(nodeIndex)).getInput()).thenReturn((RelNode) nodeList.get(inputNodeIndexes[0]));
-		} else if (inputNodeIndexes.length == 2 && nodeList.get(nodeIndex) instanceof BiRel) {
-			when(((BiRel) nodeList.get(nodeIndex)).getLeft()).thenReturn((RelNode) nodeList.get(inputNodeIndexes[0]));
-			when(((BiRel) nodeList.get(nodeIndex)).getRight()).thenReturn((RelNode) nodeList.get(inputNodeIndexes[1]));
-		}
-	}
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
deleted file mode 100644
index c23dca3..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.resource.MockNodeTestBase;
-
-import org.apache.calcite.rel.RelDistribution;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for FinalParallelismSetter.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class FinalParallelismSetterTest extends MockNodeTestBase {
-
-	private StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-	public FinalParallelismSetterTest(boolean isBatchMode) {
-		super(isBatchMode);
-	}
-
-	@Before
-	public void setUp() {
-		sEnv.setParallelism(21);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testSource() {
-		/**
-		 *   0, Source   1, Source  2, Values  4, Source   5, Source
-		 *            \      /      /            /           /
-		 *             3, Union
-		 */
-		createNodeList(6);
-		updateTableSource(0, 5);
-		updateTableSource(1);
-		updateValues(2);
-		updateUnion(3);
-		updateStreamScan(4, 7);
-		updateStreamScan(5);
-		connect(3, 0, 1, 2, 4, 5);
-		Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = FinalParallelismSetter.calculate(sEnv, Collections.singletonList(nodeList.get(3)));
-		assertEquals(3, finalParallelismNodeMap.size());
-		assertEquals(5, nodeList.get(0).getResource().getMaxParallelism());
-		assertEquals(1, finalParallelismNodeMap.get(nodeList.get(2)).intValue());
-		assertEquals(7, finalParallelismNodeMap.get(nodeList.get(4)).intValue());
-		assertEquals(21, finalParallelismNodeMap.get(nodeList.get(5)).intValue());
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testExchange() {
-		/**
-		 *   0, Source    1, Source               10, Source
-		 *        |         |                       |
-		 *   2, Exchange  3, Exchange 8, Source  9, Exchange
-		 *        |         |              \     /
-		 *   4, Calc      5, Calc          7, Join
-		 *         \         /              /
-		 *          6, Union
-		 */
-		createNodeList(11);
-		updateTableSource(0, 5);
-		updateExchange(2, RelDistribution.Type.BROADCAST_DISTRIBUTED);
-		updateExchange(3, RelDistribution.Type.SINGLETON);
-		updateExchange(9, RelDistribution.Type.SINGLETON);
-		connect(2, 0);
-		connect(4, 2);
-		connect(3, 1);
-		connect(5, 3);
-		connect(6, 4, 5, 7);
-		connect(7, 8, 9);
-		connect(9, 10);
-		Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = FinalParallelismSetter.calculate(sEnv, Collections.singletonList(nodeList.get(6)));
-		assertEquals(2, finalParallelismNodeMap.size());
-		assertEquals(5, nodeList.get(0).getResource().getMaxParallelism());
-		assertEquals(1, finalParallelismNodeMap.get(nodeList.get(5)).intValue());
-		assertEquals(1, finalParallelismNodeMap.get(nodeList.get(7)).intValue());
-	}
-
-	@Parameterized.Parameters(name = "isBatchMode = {0}")
-	public static Collection<Object[]> runMode() {
-		return Arrays.asList(
-				new Object[] { false, },
-				new Object[] { true });
-	}
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
deleted file mode 100644
index 952c45a..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.resource.MockNodeTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Test for {@link ShuffleStageGenerator}.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ShuffleStageGeneratorTest extends MockNodeTestBase {
-
-	private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap;
-
-	public ShuffleStageGeneratorTest(boolean isBatchMode) {
-		super(isBatchMode);
-	}
-
-	@Before
-	public void setUp() {
-		finalParallelismNodeMap = new HashMap<>();
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testBatchGenerateShuffleStags() {
-		/**
-		 *
-		 *    0, Source     1, Source
-		 *             \     /
-		 *             2, Union
-		 *             /     \
-		 *        3, Calc   4, Calc
-		 *           |        |
-		 *    5, Exchange    6, Exchange
-		 *            \      /
-		 *              7, Join
-		 *               |
-		 *              8, Calc
-		 */
-		createNodeList(9);
-		updateUnion(2);
-		updateExchange(5);
-		updateExchange(6);
-		connect(2, 0, 1);
-		connect(3, 2);
-		connect(4, 2);
-		connect(5, 3);
-		connect(6, 4);
-		connect(7, 5, 6);
-		connect(8, 7);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(8)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 7, 8);
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 3, 4);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testMultiOutput() {
-		/**
-		 *
-		 *    0, Source     2, Source  4, Source   6, Source
-		 *       |            |         |             |
-		 *    1, Calc       3, Calc    5, Calc     7, Exchange
-		 *            \     /      \   /       \    /
-		 *            8, Join     9, Join     10, Join
-		 *             \          /       \    /
-		 *              \   12, Exchange  \   /
-		 *               \      /         \  /
-		 *                 11, Join      13, Union
-		 *                         \      |
-		 *                15, Exchange   14, Calc
-		 *                           \   /
-		 *                           16, Join
-		 */
-		createNodeList(17);
-		updateExchange(7);
-		updateExchange(12);
-		updateUnion(13);
-		updateExchange(15);
-		connect(1, 0);
-		connect(3, 2);
-		connect(5, 4);
-		connect(7, 6);
-		connect(8, 1, 3);
-		connect(9, 3, 5);
-		connect(10, 5, 7);
-		connect(12, 9);
-		connect(11, 8, 12);
-		connect(13, 9, 10);
-		connect(14, 13);
-		connect(15, 11);
-		connect(16, 15, 14);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(16)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 8, 3, 2, 9, 5, 4, 10, 11, 14, 16);
-		assertSameShuffleStage(nodeShuffleStageMap, 6);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWithFinalParallelism() {
-		/**
-		 *
-		 *    0, Source    2, Source
-		 *       |            |
-		 *    1, Calc      3, Calc  6, Source
-		 *             \     /     /
-		 *               4, Union
-		 *                 |
-		 *               5, Calc
-		 */
-		createNodeList(7);
-		ExecNode<?, ?> scan0 = updateTableSource(0);
-		scan0.getResource().setMaxParallelism(10);
-		ExecNode<?, ?> scan1 = updateTableSource(2);
-		finalParallelismNodeMap.put(scan1, 11);
-		updateUnion(4);
-		updateCalc(5);
-		updateTableSource(6);
-		connect(1, 0);
-		connect(3, 2);
-		connect(4, 1, 3, 6);
-		connect(5, 4);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
-		assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWithFinalParallelism1() {
-		/**
-		 *
-		 *    0, Source    2, Source
-		 *       |            |
-		 *    1, Calc      3, Calc
-		 *             \     /
-		 *               4, Union
-		 *                 |
-		 *               5, Calc
-		 */
-		createNodeList(7);
-		ExecNode<?, ?> scan0 = updateTableSource(0);
-		ExecNode<?, ?> scan1 = updateTableSource(2);
-		finalParallelismNodeMap.put(scan0, 10);
-		finalParallelismNodeMap.put(scan1, 11);
-		updateUnion(4);
-		ExecNode<?, ?> calc = updateCalc(5);
-		finalParallelismNodeMap.put(calc, 12);
-		connect(1, 0);
-		connect(3, 2);
-		connect(4, 1, 3);
-		connect(5, 4);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
-		assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
-		assertSameShuffleStage(nodeShuffleStageMap, 5);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWithFinalParallelism2() {
-		/**
-		 *
-		 *    0, Source    2, Source
-		 *       |            |
-		 *       |         3, Exchange
-		 *       |            |
-		 *    1, Calc      4, Calc
-		 *             \     /
-		 *               5, Union
-		 *                 |
-		 *               6, Calc
-		 */
-		createNodeList(7);
-		ExecNode<?, ?> scan0 = updateTableSource(0);
-		ExecNode<?, ?> scan1 = updateTableSource(2);
-		finalParallelismNodeMap.put(scan0, 10);
-		finalParallelismNodeMap.put(scan1, 11);
-		updateExchange(3);
-		ExecNode<?, ?> calc = updateCalc(4);
-		finalParallelismNodeMap.put(calc, 1);
-		updateUnion(5);
-		connect(1, 0);
-		connect(3, 2);
-		connect(4, 3);
-		connect(5, 1, 4);
-		connect(6, 5);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(6)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 6);
-		assertSameShuffleStage(nodeShuffleStageMap, 2);
-		assertSameShuffleStage(nodeShuffleStageMap, 4);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWithFinalParallelism3() {
-		/**
-		 *
-		 *    0, Source    2, Source
-		 *       |            |
-		 *    1, Calc      3, Calc  6, Source   7,Source
-		 *             \     /     /             /
-		 *               4, Union
-		 *                 |
-		 *               5, Calc
-		 */
-		createNodeList(8);
-		ExecNode<?, ?> scan0 = updateTableSource(0);
-		ExecNode<?, ?> scan1 = updateTableSource(2);
-		finalParallelismNodeMap.put(scan0, 11);
-		scan1.getResource().setMaxParallelism(5);
-		ExecNode<?, ?> union4 = updateUnion(4);
-		finalParallelismNodeMap.put(union4, 5);
-		updateCalc(5);
-		updateTableSource(6);
-		updateTableSource(7);
-		connect(1, 0);
-		connect(3, 2);
-		connect(4, 1, 3, 6, 7);
-		connect(5, 4);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
-		assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5, 7);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWithFinalParallelism4() {
-		/**
-		 *
-		 *    0, Source    2, Source
-		 *       |            |
-		 *    1, Calc      3, Calc
-		 *             \     /
-		 *               4, Union
-		 *                 |
-		 *               5, Calc
-		 */
-		createNodeList(6);
-		ExecNode<?, ?> scan0 = updateTableSource(0);
-		ExecNode<?, ?> scan1 = updateTableSource(2);
-		finalParallelismNodeMap.put(scan0, 11);
-		finalParallelismNodeMap.put(scan1, 5);
-		ExecNode<?, ?> union4 = updateUnion(4);
-		finalParallelismNodeMap.put(union4, 3);
-		updateCalc(5);
-		connect(1, 0);
-		connect(3, 2);
-		connect(4, 1, 3);
-		connect(5, 4);
-
-		Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), finalParallelismNodeMap);
-
-		assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
-		assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
-		assertSameShuffleStage(nodeShuffleStageMap, 5);
-	}
-
-	private void assertSameShuffleStage(Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap, int ... nodeIndexes) {
-		Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
-		for (int index : nodeIndexes) {
-			nodeSet.add(nodeList.get(index));
-		}
-		for (int index : nodeIndexes) {
-			assertNotNull("shuffleStage should not be null. node index: " + index, nodeShuffleStageMap.get(nodeList.get(index)));
-			assertEquals("node index: " + index, nodeSet, nodeShuffleStageMap.get(nodeList.get(index)).getExecNodeSet());
-		}
-	}
-
-	@Parameterized.Parameters(name = "isBatchMode = {0}")
-	public static Collection<Object[]> runMode() {
-		return Arrays.asList(
-				new Object[] { false, },
-				new Object[] { true });
-	}
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
deleted file mode 100644
index 97cba22..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource.parallelism;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Test for {@link ShuffleStageParallelismCalculator}.
- */
-public class ShuffleStageParallelismCalculatorTest {
-
-	private Configuration tableConf;
-	private BatchExecTableSourceScan batchTableSourceScan = mock(BatchExecTableSourceScan.class);
-	private StreamExecTableSourceScan streamTableSourceScan = mock(StreamExecTableSourceScan.class);
-	private int envParallelism = 5;
-
-	@Before
-	public void setUp() {
-		tableConf = new Configuration();
-		tableConf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 50);
-	}
-
-	@Test
-	public void testOnlyBatchSource() {
-		ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
-		when(shuffleStage0.getMaxParallelism()).thenReturn(40);
-		when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(batchTableSourceScan)));
-		ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
-		verify(shuffleStage0).setParallelism(40, false);
-	}
-
-	@Test
-	public void testOnlyStreamSource() {
-		ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
-		when(shuffleStage0.getMaxParallelism()).thenReturn(20);
-		when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(streamTableSourceScan)));
-		ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
-		verify(shuffleStage0).setParallelism(20, false);
-	}
-
-	@Test
-	public void testBatchSourceAndCalc() {
-		ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
-		when(shuffleStage0.getMaxParallelism()).thenReturn(20);
-		BatchExecCalc calc = mock(BatchExecCalc.class);
-		when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(batchTableSourceScan, calc)));
-		ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
-		verify(shuffleStage0).setParallelism(20, false);
-	}
-
-	@Test
-	public void testStreamSourceAndCalc() {
-		tableConf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM, 60);
-		ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
-		when(shuffleStage0.getMaxParallelism()).thenReturn(60);
-		StreamExecCalc calc = mock(StreamExecCalc.class);
-		when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(streamTableSourceScan, calc)));
-		ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
-		verify(shuffleStage0).setParallelism(60, false);
-	}
-
-	@Test
-	public void testNoSource() {
-		ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
-		BatchExecCalc calc = mock(BatchExecCalc.class);
-		when(shuffleStage0.getMaxParallelism()).thenReturn(Integer.MAX_VALUE);
-		when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(calc)));
-		ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
-		verify(shuffleStage0).setParallelism(50, false);
-	}
-
-	@Test
-	public void testEnvParallelism() {
-		tableConf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, -1);
-		ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
-		when(shuffleStage0.getMaxParallelism()).thenReturn(4);
-		BatchExecCalc calc = mock(BatchExecCalc.class);
-		when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(batchTableSourceScan, calc)));
-		ShuffleStageParallelismCalculator.calculate(tableConf, envParallelism, Arrays.asList(shuffleStage0));
-		verify(shuffleStage0).setParallelism(4, false);
-	}
-
-	private Set<ExecNode<?, ?>> getNodeSet(List<ExecNode<?, ?>> nodeList) {
-		Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
-		nodeSet.addAll(nodeList);
-		return nodeSet;
-	}
-}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
index bc53a0a..223aeee 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
@@ -15,18 +15,18 @@ HashJoin(joinType=[InnerJoin], where=[=(s3, s30)], select=[b1, l2, s3, d4, dd5,
  : Data Source
 	content : collect elements with CollectionInputFormat
 
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+		ship_strategy : FORWARD
+
  : Data Source
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+		content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
-			ship_strategy : FORWARD
-
-			 : Operator
-				content : HashJoin(where: (s3 = s30), buildLeft)
-				ship_strategy : BROADCAST
+			content : HashJoin(where: (s3 = s30), buildLeft)
+			ship_strategy : BROADCAST
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.xml
deleted file mode 100644
index c692020..0000000
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.xml
+++ /dev/null
@@ -1,153 +0,0 @@
-<?xml version="1.0" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<Root>
-  <TestCase name="testConfigSourceParallelism[isBatchMode=false]">
-    <Resource name="sql">
-      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], resource=[{parallelism=1, maxParallelism=1}])
-   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
-      +- GroupAggregate(groupBy=[c], select=[c, SUM(a) AS sum_a], resource=[{parallelism=18}])
-         +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
-            +- Calc(select=[c, a], resource=[{parallelism=100}])
-               +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=100}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testConfigSourceParallelism[isBatchMode=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], resource=[{parallelism=1, maxParallelism=1}])
-   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
-      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], resource=[{parallelism=18}])
-         +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
-            +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
-               +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS sum$0], resource=[{parallelism=100}])
-                  +- Calc(select=[c, a], resource=[{parallelism=100}])
-                     +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=100}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSortLimit[isBatchMode=false]">
-    <Resource name="sql">
-      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], resource=[{parallelism=1, maxParallelism=1}])
-   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
-      +- GroupAggregate(groupBy=[c], select=[c, SUM(a) AS sum_a], resource=[{parallelism=18}])
-         +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
-            +- Calc(select=[c, a], resource=[{parallelism=18}])
-               +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSourcePartitionMaxNum[isBatchMode=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM table3]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSortLimit[isBatchMode=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[sum_a, c], resource=[{parallelism=1}])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], resource=[{parallelism=1, maxParallelism=1}])
-   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
-      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], resource=[{parallelism=18}])
-         +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
-            +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
-               +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS sum$0], resource=[{parallelism=18}])
-                  +- Calc(select=[c, a], resource=[{parallelism=18}])
-                     +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testUnionQuery[isBatchMode=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[sum_a, g], resource=[{parallelism=18}])
-+- HashAggregate(isMerge=[true], groupBy=[g], select=[g, Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
-   +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
-      +- LocalHashAggregate(groupBy=[g], select=[g, Partial_SUM(a) AS sum$0], resource=[{parallelism=18}])
-         +- Calc(select=[g, a], resource=[{parallelism=18}])
-            +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[a, b, e, g], build=[left], resource=[{parallelism=18}])
-               :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
-               :  +- Union(all=[true], union=[a, b], resource=[{parallelism=-1}])
-               :     :- Calc(select=[a, b], resource=[{parallelism=18}])
-               :     :  +- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-               :     +- Calc(select=[a, b], resource=[{parallelism=18}])
-               :        +- TableSourceScan(table=[[default_catalog, default_database, table4, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-               +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
-                  +- Calc(select=[e, g], resource=[{parallelism=18}])
-                     +- TableSourceScan(table=[[default_catalog, default_database, table5, source: [MockTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h], resource=[{parallelism=18}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testSourcePartitionMaxNum[isBatchMode=false]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM table3]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testUnionQuery[isBatchMode=false]">
-    <Resource name="sql">
-      <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[sum_a, g], resource=[{parallelism=18}])
-+- GroupAggregate(groupBy=[g], select=[g, SUM(a) AS sum_a], resource=[{parallelism=18}])
-   +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
-      +- Calc(select=[g, a], resource=[{parallelism=18}])
-         +- Join(joinType=[InnerJoin], where=[=(b, e)], select=[a, b, e, g], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], resource=[{parallelism=18}])
-            :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
-            :  +- Calc(select=[a, b], resource=[{parallelism=18}])
-            :     +- Union(all=[true], union=[a, b, c], resource=[{parallelism=-1}])
-            :        :- TableSourceScan(table=[[default_catalog, default_database, table3, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-            :        +- TableSourceScan(table=[[default_catalog, default_database, table4, source: [MockTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
-            +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
-               +- Calc(select=[e, g], resource=[{parallelism=18}])
-                  +- TableSourceScan(table=[[default_catalog, default_database, table5, source: [MockTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h], resource=[{parallelism=18}])
-]]>
-    </Resource>
-  </TestCase>
-</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index ca4650c..4ae9b88 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.utils.TableTestBase
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -40,6 +40,12 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   val LONG = new BigIntType()
   val INT = new IntType()
 
+  @Before
+  def before(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setInteger(
+      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+  }
+
   @Test
   def testExplainWithTableSourceScan(): Unit = {
     util.verifyExplain("SELECT * FROM MyTable", extended)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 59ae7d6..9952e34 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.utils.TableTestBase
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -42,6 +42,12 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   val LONG = new BigIntType()
   val INT = new IntType()
 
+  @Before
+  def before(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setInteger(
+      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+  }
+
   @Test
   def testExplainTableSourceScan(): Unit = {
     util.verifyExplain("SELECT * FROM MyTable", extended)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.scala
deleted file mode 100644
index 2716919..0000000
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/resource/ExecNodeResourceTest.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.resource
-
-import org.apache.flink.api.common.io.OutputFormat
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory
-import org.apache.flink.streaming.api.transformations.{SinkTransformation, SourceTransformation}
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableSchema, Types}
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.plan.stats.TableStats
-import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil, TestingTableEnvironment}
-import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
-import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
-import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.StreamTableSource
-
-import org.junit.Assert.assertEquals
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{Before, Test}
-import org.mockito.Mockito.{mock, when}
-
-import java.util
-
-@RunWith(classOf[Parameterized])
-class ExecNodeResourceTest(isBatchMode: Boolean) extends TableTestBase {
-
-  private var testUtil: TableTestUtil = _
-
-  @Before
-  def before(): Unit = {
-    testUtil = if(isBatchMode) batchTestUtil() else streamTestUtil()
-    val table3Stats = new TableStats(5000000)
-    val table3Source = new MockTableSource(isBatchMode,
-      new TableSchema(Array("a", "b", "c"),
-      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
-    testUtil.addTableSource(
-      "table3", table3Source, FlinkStatistic.builder().tableStats(table3Stats).build())
-    val table5Stats = new TableStats(8000000)
-    val table5Source = new MockTableSource(isBatchMode,
-      new TableSchema(Array("d", "e", "f", "g", "h"),
-      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.INT, Types.STRING, Types.LONG)))
-    testUtil.addTableSource(
-      "table5", table5Source, FlinkStatistic.builder().tableStats(table5Stats).build())
-    ExecNodeResourceTest.setResourceConfig(testUtil.tableEnv.getConfig)
-  }
-
-  @Test
-  def testSourcePartitionMaxNum(): Unit = {
-    val sqlQuery = "SELECT * FROM table3"
-    testUtil.verifyResource(sqlQuery)
-  }
-
-  @Test
-  def testSortLimit(): Unit = {
-    val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2"
-    testUtil.verifyResource(sqlQuery)
-  }
-
-  @Test
-  def testConfigSourceParallelism(): Unit = {
-    testUtil.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SOURCE_PARALLELISM, 100)
-    val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2"
-    testUtil.verifyResource(sqlQuery)
-  }
-
-  @Test
-  def testUnionQuery(): Unit = {
-    val statsOfTable4 = new TableStats(100L)
-    val table4Source = new MockTableSource(isBatchMode,
-      new TableSchema(Array("a", "b", "c"),
-        Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
-    testUtil.addTableSource(
-      "table4", table4Source, FlinkStatistic.builder().tableStats(statsOfTable4).build())
-
-    val sqlQuery = "SELECT sum(a) as sum_a, g FROM " +
-        "(SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 " +
-        "WHERE b = e group by g"
-    testUtil.verifyResource(sqlQuery)
-  }
-
-  @Test
-  def testSinkSelfParallelism(): Unit = {
-    val sqlQuery = "SELECT * FROM table3"
-    val table = testUtil.tableEnv.sqlQuery(sqlQuery)
-    val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
-      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), -1)
-
-    testUtil.writeToSink(table, tableSink, "sink")
-    testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
-    assertEquals(17, tableSink.getSinkTransformation.getParallelism)
-  }
-
-  @Test
-  def testSinkConfigParallelism(): Unit = {
-    testUtil.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM,
-      25
-    )
-    val sqlQuery = "SELECT * FROM table3"
-    val table = testUtil.tableEnv.sqlQuery(sqlQuery)
-    val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
-      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), -1)
-
-    testUtil.writeToSink(table, tableSink, "sink")
-    testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
-    assertEquals(25, tableSink.getSinkTransformation.getParallelism)
-  }
-
-  @Test
-  def testSinkConfigParallelismWhenMax1(): Unit = {
-    testUtil.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM,
-      25
-    )
-    val sqlQuery = "SELECT * FROM table3"
-    val table = testUtil.tableEnv.sqlQuery(sqlQuery)
-    val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
-      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), 23)
-
-    testUtil.writeToSink(table, tableSink, "sink")
-    testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
-    assertEquals(17, tableSink.getSinkTransformation.getParallelism)
-  }
-
-  @Test
-  def testSinkConfigParallelismWhenMax2(): Unit = {
-    testUtil.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SINK_PARALLELISM,
-      25
-    )
-    val sqlQuery = "SELECT * FROM table3"
-    val table = testUtil.tableEnv.sqlQuery(sqlQuery)
-    val tableSink = new MockTableSink(new TableSchema(Array("a", "b", "c"),
-      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)), 25)
-
-    testUtil.writeToSink(table, tableSink, "sink")
-    testUtil.tableEnv.asInstanceOf[TestingTableEnvironment].translate()
-    assertEquals(25, tableSink.getSinkTransformation.getParallelism)
-  }
-}
-
-object ExecNodeResourceTest {
-
-  @Parameterized.Parameters(name = "isBatchMode={0}")
-  def parameters(): util.Collection[Array[Any]] = {
-    util.Arrays.asList(
-      Array(true),
-      Array(false)
-    )
-  }
-
-  def setResourceConfig(tableConfig: TableConfig): Unit = {
-    tableConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-      18)
-  }
-}
-
-/**
-  * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for resource testing.
-  */
-class MockTableSource(override val isBounded: Boolean, schema: TableSchema)
-    extends StreamTableSource[BaseRow] {
-
-  override def getDataStream(
-      execEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = {
-    val transformation = mock(classOf[SourceTransformation[BaseRow]])
-    when(transformation.getMaxParallelism).thenReturn(-1)
-    val bs = mock(classOf[DataStream[BaseRow]])
-    when(bs.getTransformation).thenReturn(transformation)
-    when(transformation.getOutputType).thenReturn(getReturnType)
-    val factory = mock(classOf[StreamOperatorFactory[BaseRow]])
-    when(factory.isStreamSource).thenReturn(!isBounded)
-    when(transformation.getOperatorFactory).thenReturn(factory)
-    bs
-  }
-
-  override def getReturnType: TypeInformation[BaseRow] = {
-    val LogicalTypes = schema.getFieldTypes.map(
-      TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType)
-    new BaseRowTypeInfo(LogicalTypes, schema.getFieldNames)
-  }
-
-  override def getTableSchema: TableSchema = schema
-}
-
-/**
-  * Batch/Stream [[org.apache.flink.table.sinks.TableSink]] for resource testing.
-  */
-class MockTableSink(schema: TableSchema, maxParallelism: Int) extends StreamTableSink[BaseRow]
-with AppendStreamTableSink[BaseRow] {
-
-  private var sinkTransformation:SinkTransformation[BaseRow] = _
-
-  override def emitDataStream(dataStream: DataStream[BaseRow]) = ???
-
-  override def consumeDataStream(dataStream: DataStream[BaseRow]): DataStreamSink[_] = {
-    val outputFormat = mock(classOf[OutputFormat[BaseRow]])
-    val ret = dataStream.writeUsingOutputFormat(outputFormat).name("collect")
-    ret.getTransformation.setParallelism(17)
-    if (maxParallelism > 0) {
-      ret.getTransformation.setMaxParallelism(maxParallelism)
-    }
-    sinkTransformation = ret.getTransformation
-    ret
-  }
-
-  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]):
-  TableSink[BaseRow] = {
-    this
-  }
-
-  override def getOutputType: TypeInformation[BaseRow] = {
-    val LogicalTypes = schema.getFieldTypes.map(
-      TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType)
-    new BaseRowTypeInfo(LogicalTypes, schema.getFieldNames)
-  }
-
-  override def getFieldNames: Array[String] = schema.getFieldNames
-
-  /**
-    * @deprecated Use the field types of { @link #getTableSchema()} instead.
-    */
-  override def getFieldTypes: Array[TypeInformation[_]] = schema.getFieldTypes
-
-  def getSinkTransformation: SinkTransformation[BaseRow] = sinkTransformation
-}
-