You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/12 02:07:37 UTC

[flink] branch master updated: [FLINK-13121][table-planner-blink] Set properties to StreamGraph in blink executor

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a9336f9  [FLINK-13121][table-planner-blink] Set properties to StreamGraph in blink executor
a9336f9 is described below

commit a9336f962c143864e1f326a9ba59df7da2839be0
Author: Xupingyong <xu...@163.com>
AuthorDate: Wed Jul 10 14:38:04 2019 +0800

    [FLINK-13121][table-planner-blink] Set properties to StreamGraph in blink executor
    
    This closes #9057
---
 .../apache/flink/table/executor/BatchExecutor.java | 127 ++++++++++----
 .../apache/flink/table/executor/ExecutorBase.java  |  11 ++
 .../flink/table/executor/StreamExecutor.java       |  20 +--
 ...deResourceConfig.java => NodeResourceUtil.java} |   9 +-
 .../ShuffleStageParallelismCalculator.java         |   6 +-
 .../codegen/NestedLoopJoinCodeGenerator.scala      |   4 +-
 .../nodes/physical/batch/BatchExecExchange.scala   |  94 +++-------
 .../batch/BatchExecHashAggregateBase.scala         |  15 +-
 .../nodes/physical/batch/BatchExecHashJoin.scala   |  16 +-
 .../batch/BatchExecHashWindowAggregateBase.scala   |  14 +-
 .../physical/batch/BatchExecNestedLoopJoin.scala   |  14 +-
 .../physical/batch/BatchExecOverAggregate.scala    |  13 +-
 .../plan/nodes/physical/batch/BatchExecSink.scala  |   4 +-
 .../plan/nodes/physical/batch/BatchExecSort.scala  |  14 +-
 .../physical/batch/BatchExecSortMergeJoin.scala    |  19 +-
 .../nodes/physical/stream/StreamExecSink.scala     |   4 +-
 .../plan/reuse/DeadlockBreakupProcessor.scala      |   8 +-
 .../apache/flink/table/planner/BatchPlanner.scala  |   4 +-
 .../apache/flink/table/planner/PlannerBase.scala   |   2 +
 .../apache/flink/table/planner/StreamPlanner.scala |   6 +-
 .../flink/table/catalog/PathResolutionTest.xml     | 195 ---------------------
 .../flink/table/executor/BatchExecutorTest.xml     |  59 +++++++
 .../table/plan/batch/sql/DagOptimizationTest.xml   |  14 +-
 .../table/plan/batch/sql/DeadlockBreakupTest.xml   |  26 +--
 .../table/plan/batch/sql/SetOperatorsTest.xml      |   2 +-
 .../table/plan/batch/sql/SubplanReuseTest.xml      |  40 ++---
 .../table/plan/batch/sql/join/LookupJoinTest.xml   |   2 +-
 .../plan/batch/sql/join/SingleRowJoinTest.xml      |   4 +-
 .../table/plan/batch/table/SetOperatorsTest.xml    |   2 +-
 .../flink/table/executor/BatchExecutorTest.scala   |  81 +++++++++
 .../flink/table/runtime/utils/BatchTestBase.scala  |   3 +
 .../apache/flink/table/util/TableTestBase.scala    |   5 +
 .../flink/table/api/ExecutionConfigOptions.java    |  12 +-
 33 files changed, 450 insertions(+), 399 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index b1dfbce..8a90b80 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -21,17 +21,18 @@ package org.apache.flink.table.executor;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.api.ExecutionConfigOptions;
 import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil;
 
 import java.util.List;
 
@@ -42,6 +43,8 @@ import java.util.List;
 @Internal
 public class BatchExecutor extends ExecutorBase {
 
+	private BatchExecEnvConfig batchExecEnvConfig = new BatchExecEnvConfig();
+
 	@VisibleForTesting
 	public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
 		super(executionEnvironment);
@@ -49,40 +52,100 @@ public class BatchExecutor extends ExecutorBase {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		if (transformations.isEmpty()) {
-			throw new TableException("No table sinks have been created yet. " +
-				"A program needs at least one sink that consumes data. ");
-		}
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		StreamGraph streamGraph = generateStreamGraph(execEnv, transformations, getNonEmptyJobName(jobName));
-
-		// TODO supports streamEnv.execute(streamGraph)
-		try {
-			return execEnv.execute(getNonEmptyJobName(jobName));
-		} finally {
-			transformations.clear();
-		}
+		StreamGraph streamGraph = generateStreamGraph(transformations, jobName);
+		return execEnv.execute(streamGraph);
 	}
 
-	public static StreamGraph generateStreamGraph(
-		StreamExecutionEnvironment execEnv,
-		List<Transformation<?>> transformations,
-		String jobName) throws Exception {
-		// TODO avoid cloning ExecutionConfig
-		ExecutionConfig executionConfig = InstantiationUtil.clone(execEnv.getConfig());
+	/**
+	 * Backup previous streamEnv config and set batch configs.
+	 */
+	private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) {
+		batchExecEnvConfig.backup(execEnv);
+		ExecutionConfig executionConfig = execEnv.getConfig();
 		executionConfig.enableObjectReuse();
 		executionConfig.setLatencyTrackingInterval(-1);
+		execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		execEnv.setBufferTimeout(-1);
+		if (isShuffleModeAllBatch()) {
+			executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
+		}
+	}
 
-		return new StreamGraphGenerator(transformations, executionConfig, new CheckpointConfig())
-			.setChaining(execEnv.isChainingEnabled())
-			.setStateBackend(execEnv.getStateBackend())
-			.setDefaultBufferTimeout(-1)
-			.setTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-			.setUserArtifacts(execEnv.getCachedFiles())
-			.setSlotSharingEnabled(false)
-			.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES)
-			.setJobName(jobName)
-			.generate();
+	/**
+	 * Translates transformationList to streamGraph.
+	 */
+	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
+		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
+		backupAndUpdateStreamEnv(execEnv);
+		transformations.forEach(execEnv::addOperator);
+		StreamGraph streamGraph;
+		streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
+		// All transformations should set managed memory size.
+		ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
+		streamGraph.getStreamNodes().forEach(sn -> {
+			sn.setResources(sn.getMinResources().merge(managedResourceSpec), sn.getPreferredResources().merge(managedResourceSpec));
+		});
+		streamGraph.setChaining(true);
+		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+		streamGraph.setStateBackend(null);
+		streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+		if (isShuffleModeAllBatch()) {
+			streamGraph.setBlockingConnectionsBetweenChains(true);
+		}
+		batchExecEnvConfig.restore(execEnv);
+		return streamGraph;
 	}
 
+	private boolean isShuffleModeAllBatch() {
+		String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE);
+		if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
+			return true;
+		} else if (!value.equalsIgnoreCase(ShuffleMode.PIPELINED.toString())) {
+			throw new IllegalArgumentException(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE.key() +
+					" can only be set to " + ShuffleMode.BATCH.toString() + " or " + ShuffleMode.PIPELINED.toString());
+		}
+		return false;
+	}
+
+	/**
+	 * Batch configs that are set in {@link StreamExecutionEnvironment}. We should backup and change
+	 * these configs and restore finally.
+	 */
+	private static class BatchExecEnvConfig {
+
+		private boolean enableObjectReuse;
+		private long latencyTrackingInterval;
+		private long bufferTimeout;
+		private TimeCharacteristic timeCharacteristic;
+		private InputDependencyConstraint inputDependencyConstraint;
+
+		/**
+		 * Backup previous streamEnv config.
+		 */
+		public void backup(StreamExecutionEnvironment execEnv) {
+			ExecutionConfig executionConfig = execEnv.getConfig();
+			enableObjectReuse = executionConfig.isObjectReuseEnabled();
+			latencyTrackingInterval = executionConfig.getLatencyTrackingInterval();
+			timeCharacteristic = execEnv.getStreamTimeCharacteristic();
+			bufferTimeout = execEnv.getBufferTimeout();
+			inputDependencyConstraint = executionConfig.getDefaultInputDependencyConstraint();
+		}
+
+		/**
+		 * Restore previous streamEnv after execute batch jobs.
+		 */
+		public void restore(StreamExecutionEnvironment execEnv) {
+			ExecutionConfig executionConfig = execEnv.getConfig();
+			if (enableObjectReuse) {
+				executionConfig.enableObjectReuse();
+			} else {
+				executionConfig.disableObjectReuse();
+			}
+			executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
+			execEnv.setStreamTimeCharacteristic(timeCharacteristic);
+			execEnv.setBufferTimeout(bufferTimeout);
+			executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
+		}
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java
index fb6bbf1..4d67547 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.executor;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.util.StringUtils;
 
@@ -37,11 +39,16 @@ public abstract class ExecutorBase implements Executor {
 
 	private final StreamExecutionEnvironment executionEnvironment;
 	protected List<Transformation<?>> transformations = new ArrayList<>();
+	protected TableConfig tableConfig;
 
 	public ExecutorBase(StreamExecutionEnvironment executionEnvironment) {
 		this.executionEnvironment = executionEnvironment;
 	}
 
+	public void setTableConfig(TableConfig tableConfig) {
+		this.tableConfig = tableConfig;
+	}
+
 	@Override
 	public void apply(List<Transformation<?>> transformations) {
 		this.transformations.addAll(transformations);
@@ -51,6 +58,10 @@ public abstract class ExecutorBase implements Executor {
 		return executionEnvironment;
 	}
 
+	public abstract StreamGraph generateStreamGraph(
+			List<Transformation<?>> transformations,
+			String jobName) throws Exception;
+
 	protected String getNonEmptyJobName(String jobName) {
 		if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
 			return DEFAULT_JOB_NAME;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
index 3863e12..9df4ea3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.table.delegation.Executor;
 
 import java.util.List;
@@ -40,19 +41,16 @@ public class StreamExecutor extends ExecutorBase {
 	}
 
 	@Override
-	public void apply(List<Transformation<?>> transformations) {
-		transformations.forEach(getExecutionEnvironment()::addOperator);
-	}
-
-	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		try {
-			transformations.forEach(execEnv::addOperator);
-			return execEnv.execute(getNonEmptyJobName(jobName));
-		} finally {
-			transformations.clear();
-		}
+		return execEnv.execute(generateStreamGraph(transformations, jobName));
+	}
+
+	public StreamGraph generateStreamGraph(
+			List<Transformation<?>> transformations,
+			String jobName) throws Exception {
+		transformations.forEach(getExecutionEnvironment()::addOperator);
+		return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
 	}
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceConfig.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
similarity index 89%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceConfig.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
index 91e3c8c..1f3fc35 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceConfig.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.table.plan.nodes.resource;
 
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ExecutionConfigOptions;
 
 /**
  * Deal with resource config for {@link org.apache.flink.table.plan.nodes.exec.ExecNode}.
  */
-public class NodeResourceConfig {
+public class NodeResourceUtil {
 
 	/**
 	 * How many Bytes per MB.
@@ -67,4 +68,10 @@ public class NodeResourceConfig {
 		}
 		return parallelism;
 	}
+
+	public static ResourceSpec fromManagedMem(int managedMem) {
+		ResourceSpec.Builder builder = ResourceSpec.newBuilder();
+		builder.setManagedMemoryInMB(managedMem);
+		return builder.build();
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
index 2501e49..31cb733 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
 import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig;
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +68,7 @@ public class ShuffleStageParallelismCalculator {
 		int maxParallelism = shuffleStage.getMaxParallelism();
 		for (ExecNode<?, ?> node : nodeSet) {
 			if (node instanceof BatchExecTableSourceScan || node instanceof StreamExecTableSourceScan) {
-				int result = NodeResourceConfig.getSourceParallelism(tableConf, envParallelism);
+				int result = NodeResourceUtil.getSourceParallelism(tableConf, envParallelism);
 				if (result > sourceParallelism) {
 					sourceParallelism = result;
 				}
@@ -78,7 +78,7 @@ public class ShuffleStageParallelismCalculator {
 		if (sourceParallelism > 0) {
 			shuffleStageParallelism = sourceParallelism;
 		} else {
-			shuffleStageParallelism = NodeResourceConfig.getOperatorDefaultParallelism(getTableConf(), envParallelism);
+			shuffleStageParallelism = NodeResourceUtil.getOperatorDefaultParallelism(getTableConf(), envParallelism);
 		}
 		if (shuffleStageParallelism > maxParallelism) {
 			shuffleStageParallelism = maxParallelism;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
index e5ce158..c2000f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.ExecutionConfigOptions
 import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, BINARY_ROW, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM, className, newName}
 import org.apache.flink.table.codegen.OperatorCodeGenerator.{INPUT_SELECTION, generateCollect}
 import org.apache.flink.table.dataformat.{BaseRow, JoinedRow}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.runtime.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.join.FlinkJoinType
 import org.apache.flink.table.runtime.util.ResettableExternalBuffer
@@ -71,7 +71,7 @@ class NestedLoopJoinCodeGenerator(
     val isBinaryRow = newName("isBinaryRow")
 
     val externalBufferMemorySize = config.getConfiguration.getInteger(
-      ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+      ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceUtil.SIZE_IN_MB
 
     if (singleRowJoin) {
       ctx.addReusableMember(s"$BASE_ROW $buildRow = null;")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
index 015be56..f2170cb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
@@ -20,15 +20,16 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.io.network.DataExchangeMode
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.{PartitionTransformation, ShuffleMode}
 import org.apache.flink.streaming.runtime.partitioner.{BroadcastPartitioner, GlobalPartitioner, RebalancePartitioner}
+import org.apache.flink.table.api.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGeneratorContext, HashCodeGenerator}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.common.CommonPhysicalExchange
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
 import org.apache.flink.table.planner.BatchPlanner
 import org.apache.flink.table.runtime.BinaryHashPartitioner
 import org.apache.flink.table.types.logical.RowType
@@ -44,50 +45,8 @@ import scala.collection.JavaConversions._
 /**
   * This RelNode represents a change of partitioning of the input elements.
   *
-  * This does not create a physical transformation If its relDistribution' type is not range,
-  * it only affects how upstream operations are connected to downstream operations.
-  *
-  * But if the type is range, this relNode will create some physical transformation because it
-  * need calculate the data distribution. To calculate the data distribution, the received stream
-  * will split in two process stream. For the first process stream, it will go through the sample
-  * and statistics to calculate the data distribution in pipeline mode. For the second process
-  * stream will been bocked. After the first process stream has been calculated successfully,
-  * then the two process stream  will union together. Thus it can partitioner the record based
-  * the data distribution. Then The RelNode will create the following transformations.
-  *
-  * +---------------------------------------------------------------------------------------------+
-  * |                                                                                             |
-  * | +-----------------------------+                                                             |
-  * | |       Transformation        | ------------------------------------>                       |
-  * | +-----------------------------+                                     |                       |
-  * |                 |                                                   |                       |
-  * |                 |                                                   |                       |
-  * |                 |forward & PIPELINED                                |                       |
-  * |                \|/                                                  |                       |
-  * | +--------------------------------------------+                      |                       |
-  * | | OneInputTransformation[LocalSample, n]     |                      |                       |
-  * | +--------------------------------------------+                      |                       |
-  * |                      |                                              |forward & BATCH        |
-  * |                      |forward & PIPELINED                           |                       |
-  * |                     \|/                                             |                       |
-  * | +--------------------------------------------------+                |                       |
-  * | |OneInputTransformation[SampleAndHistogram, 1]     |                |                       |
-  * | +--------------------------------------------------+                |                       |
-  * |                        |                                            |                       |
-  * |                        |broadcast & PIPELINED                       |                       |
-  * |                        |                                            |                       |
-  * |                       \|/                                          \|/                      |
-  * | +---------------------------------------------------+------------------------------+        |
-  * | |               TwoInputTransformation[AssignRangeId, n]                           |        |
-  * | +----------------------------------------------------+-----------------------------+        |
-  * |                                       |                                                     |
-  * |                                       |custom & PIPELINED                                   |
-  * |                                      \|/                                                    |
-  * | +---------------------------------------------------+------------------------------+        |
-  * | |               OneInputTransformation[RemoveRangeId, n]                           |        |
-  * | +----------------------------------------------------+-----------------------------+        |
-  * |                                                                                             |
-  * +---------------------------------------------------------------------------------------------+
+  * This does not create a physical transformation if its relDistribution' type is not range which
+  * is not supported now.
   */
 class BatchExecExchange(
     cluster: RelOptCluster,
@@ -103,9 +62,9 @@ class BatchExecExchange(
   // and different PartitionTransformation objects will be created which have same input.
   // cache input transformation to reuse
   private var reusedInput: Option[Transformation[BaseRow]] = None
-  // the required exchange mode for reusable ExchangeBatchExec
-  // if it's None, use value from getDataExchangeMode
-  private var requiredExchangeMode: Option[DataExchangeMode] = None
+  // the required shuffle mode for reusable ExchangeBatchExec
+  // if it's None, use value from getShuffleMode
+  private var requiredShuffleMode: Option[ShuffleMode] = None
 
   override def copy(
       traitSet: RelTraitSet,
@@ -116,25 +75,36 @@ class BatchExecExchange(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .itemIf("exchange_mode", requiredExchangeMode.orNull,
-        requiredExchangeMode.contains(DataExchangeMode.BATCH))
+      .itemIf("shuffle_mode", requiredShuffleMode.orNull,
+        requiredShuffleMode.contains(ShuffleMode.BATCH))
   }
 
   //~ ExecNode methods -----------------------------------------------------------
 
-  def setRequiredDataExchangeMode(exchangeMode: DataExchangeMode): Unit = {
-    require(exchangeMode != null)
-    requiredExchangeMode = Some(exchangeMode)
+  def setRequiredShuffleMode(shuffleMode: ShuffleMode): Unit = {
+    require(shuffleMode != null)
+    requiredShuffleMode = Some(shuffleMode)
   }
 
-  private[flink] def getDataExchangeMode(tableConf: Configuration): DataExchangeMode = {
-    requiredExchangeMode match {
-      case Some(mode) if mode eq DataExchangeMode.BATCH => mode
-      case _ => DataExchangeMode.PIPELINED
+  private[flink] def getShuffleMode(tableConf: Configuration): ShuffleMode = {
+    requiredShuffleMode match {
+      case Some(mode) if mode eq ShuffleMode.BATCH => mode
+      case _ =>
+        if (tableConf.getString(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE)
+            .equalsIgnoreCase(ShuffleMode.BATCH.toString)) {
+          ShuffleMode.BATCH
+        } else {
+          ShuffleMode.UNDEFINED
+        }
     }
   }
 
   override def getDamBehavior: DamBehavior = {
+    val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+    val shuffleMode = getShuffleMode(tableConfig.getConfiguration)
+    if (shuffleMode eq ShuffleMode.BATCH) {
+      return DamBehavior.FULL_DAM
+    }
     distribution.getType match {
       case RelDistribution.Type.RANGE_DISTRIBUTED => DamBehavior.FULL_DAM
       case _ => DamBehavior.PIPELINED
@@ -164,14 +134,8 @@ class BatchExecExchange(
     val inputType = input.getOutputType.asInstanceOf[BaseRowTypeInfo]
     val outputRowType = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
 
-    val shuffleMode = requiredExchangeMode match {
-      case None => ShuffleMode.PIPELINED
-      case Some(mode) =>
-        mode match {
-          case DataExchangeMode.BATCH => ShuffleMode.BATCH
-          case DataExchangeMode.PIPELINED => ShuffleMode.PIPELINED
-        }
-    }
+    val conf = planner.getTableConfig
+    val shuffleMode = getShuffleMode(conf.getConfiguration)
 
     relDistribution.getType match {
       case RelDistribution.Type.ANY =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index 7734ada..e395f63 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.cost.FlinkCost._
 import org.apache.flink.table.plan.cost.FlinkCostFactory
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.plan.util.FlinkRelMdUtil
 import org.apache.flink.table.planner.BatchPlanner
@@ -131,22 +131,27 @@ abstract class BatchExecHashAggregateBase(
     val aggInfos = transformToBatchAggregateInfoList(
       aggCallToAggFunction.map(_._1), aggInputRowType)
 
+    var managedMemoryInMB = 0
     val generatedOperator = if (grouping.isEmpty) {
       AggWithoutKeysCodeGenerator.genWithoutKeys(
         ctx, relBuilder, aggInfos, inputType, outputType, isMerge, isFinal, "NoGrouping")
     } else {
-      val reservedManagedMem = config.getConfiguration.getInteger(
-        ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * NodeResourceConfig.SIZE_IN_MB
+      managedMemoryInMB = config.getConfiguration.getInteger(
+        ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM)
+      val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
       new HashAggCodeGenerator(
         ctx, relBuilder, aggInfos, inputType, outputType, grouping, auxGrouping, isMerge, isFinal
-      ).genWithKeys(reservedManagedMem)
+      ).genWithKeys(managedMemory)
     }
     val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
-    new OneInputTransformation(
+    val ret = new OneInputTransformation(
       input,
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
       getResource.getParallelism)
+    val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+    ret.setResources(resource, resource)
+    ret
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
index 0e64fcf..c250c20 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.exec.ExecNode
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.nodes.{ExpressionFormat, FlinkConventions}
 import org.apache.flink.table.plan.util.{FlinkRelMdUtil, JoinUtil}
 import org.apache.flink.table.planner.BatchPlanner
@@ -201,8 +201,9 @@ class BatchExecHashJoin(
     val rType = rInput.getOutputType.asInstanceOf[BaseRowTypeInfo].toRowType
 
     val keyType = RowType.of(leftKeys.map(lType.getChildren().get(_)): _*)
-    val managedMemorySize = config.getConfiguration.getInteger(
-      ExecutionConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM) * NodeResourceConfig.SIZE_IN_MB
+    val managedMemoryInMB = config.getConfiguration.getInteger(
+      ExecutionConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM)
+    val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
     val condFunc = JoinUtil.generateConditionFunction(
       config, cluster.getRexBuilder, getJoinInfo, lType, rType)
 
@@ -234,7 +235,7 @@ class BatchExecHashJoin(
         pType,
         buildKeys,
         probeKeys,
-        managedMemorySize,
+        managedMemory,
         0,
         0,
         buildRowSize,
@@ -243,7 +244,7 @@ class BatchExecHashJoin(
         condFunc)
     } else {
       SimpleOperatorFactory.of(HashJoinOperator.newHashJoinOperator(
-        managedMemorySize,
+        managedMemory,
         0,
         0,
         hashJoinType,
@@ -260,13 +261,16 @@ class BatchExecHashJoin(
       ))
     }
 
-    new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+    val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       build,
       probe,
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
       getResource.getParallelism)
+    val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+    ret.setResources(resource, resource)
+    ret
   }
 
   private def getOperatorName: String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 341fe0b..e9d49ad 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.logical.LogicalWindow
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.plan.util.FlinkRelMdUtil
 import org.apache.flink.table.planner.BatchPlanner
@@ -135,21 +135,25 @@ abstract class BatchExecHashWindowAggregateBase(
 
     val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window)
 
-    val reservedManagedMem = config.getConfiguration.getInteger(
-      ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * NodeResourceConfig.SIZE_IN_MB
+    val managedMemoryInMB = config.getConfiguration.getInteger(
+      ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM)
+    val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
 
     val generatedOperator = new HashWindowCodeGenerator(
       ctx, relBuilder, window, inputTimeFieldIndex,
       inputTimeIsDate, namedProperties,
       aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, isMerge, isFinal).gen(
-      inputType, outputType, groupBufferLimitSize, reservedManagedMem, 0,
+      inputType, outputType, groupBufferLimitSize, managedMemory, 0,
       windowSize, slideSize)
     val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
-    new OneInputTransformation(
+    val ret = new OneInputTransformation(
       input,
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
       getResource.getParallelism)
+    val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+    ret.setResources(resource, resource)
+    ret
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index 6da6623..229bebc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -21,12 +21,14 @@ package org.apache.flink.table.plan.nodes.physical.batch
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGeneratorContext, NestedLoopJoinCodeGenerator}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.ExpressionFormat
 import org.apache.flink.table.plan.nodes.exec.ExecNode
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.planner.BatchPlanner
 import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryRowSerializer}
 
@@ -150,13 +152,23 @@ class BatchExecNestedLoopJoin(
       condition
     ).gen()
 
-    new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+    val externalBufferMemoryInMb: Int = if (singleRowJoin) {
+      0
+    } else {
+      planner.getTableConfig.getConfiguration.getInteger(
+        ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM)
+    }
+    val resourceSpec = NodeResourceUtil.fromManagedMem(externalBufferMemoryInMb)
+
+    val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       lInput,
       rInput,
       getOperatorName,
       op,
       BaseRowTypeInfo.of(outputType),
       getResource.getParallelism)
+    ret.setResources(resourceSpec, resourceSpec)
+    ret
   }
 
   private def getOperatorName: String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index f4dee1c..16c0c4d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -35,7 +35,7 @@ import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistri
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.plan.nodes.physical.batch.OverWindowMode.OverWindowMode
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecJoinRuleBase
 import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.plan.util.OverAggregateUtil.getLongBoundary
@@ -378,6 +378,7 @@ class BatchExecOverAggregate(
       collation.map(_._1),
       collation.map(_._2))
 
+    var managedMemoryInMB: Int = 0
     val operator = if (!needBufferData) {
       //operator needn't cache data
       val aggHandlers = modeToGroupToAggCallToAggFunction.map { case (_, _, aggCallToAggFunction) =>
@@ -408,15 +409,19 @@ class BatchExecOverAggregate(
       new NonBufferOverWindowOperator(aggHandlers, genComparator, resetAccumulators)
     } else {
       val windowFrames = createOverWindowFrames(config)
+      managedMemoryInMB = config.getConfiguration.getInteger(
+        ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM)
       new BufferDataOverWindowOperator(
-        config.getConfiguration.getInteger(
-          ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB,
+        managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB,
         windowFrames,
         genComparator,
         inputType.getChildren.forall(t => BinaryRow.isInFixedLengthPart(t)))
     }
-    new OneInputTransformation(
+    val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+    val ret = new OneInputTransformation(
       input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), getResource.getParallelism)
+    ret.setResources(resource, resource)
+    ret
   }
 
   def createOverWindowFrames(config: TableConfig): Array[OverWindowFrame] = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 19aa60e..9ce1d4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.planner.BatchPlanner
 import org.apache.flink.table.sinks.{DataStreamTableSink, RetractStreamTableSink, StreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType}
@@ -98,7 +98,7 @@ class BatchExecSink[T](
         }
         val sinkTransformation = dsSink.getTransformation
 
-        val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+        val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
           planner.getTableConfig.getConfiguration)
 
         val maxSinkParallelism = sinkTransformation.getMaxParallelism
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
index 00c1fb6..7662afc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.codegen.sort.SortCodeGenerator
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RelExplainUtil, SortUtil}
 import org.apache.flink.table.planner.BatchPlanner
 import org.apache.flink.table.runtime.sort.SortOperator
@@ -116,19 +116,23 @@ class BatchExecSort(
     val keyTypes = keys.map(inputType.getTypeAt)
     val codeGen = new SortCodeGenerator(conf, keys, keyTypes, orders, nullsIsLast)
 
-    val reservedMemorySize = conf.getConfiguration.getInteger(
-      ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+    val managedMemoryInMB = conf.getConfiguration.getInteger(
+      ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM)
+    val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
 
     val operator = new SortOperator(
-      reservedMemorySize,
+      managedMemory,
       codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
       codeGen.generateRecordComparator("BatchExecSortComparator"))
 
-    new OneInputTransformation(
+    val ret = new OneInputTransformation(
       input,
       s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
       operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
       BaseRowTypeInfo.of(outputType),
       getResource.getParallelism)
+    val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+    ret.setResources(resource, resource)
+    ret
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index 1d9ba48..bb04dd4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.ExpressionFormat
 import org.apache.flink.table.plan.nodes.exec.ExecNode
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, JoinUtil, SortUtil}
 import org.apache.flink.table.planner.BatchPlanner
 import org.apache.flink.table.runtime.join.{FlinkJoinType, SortMergeJoinOperator}
@@ -219,11 +219,13 @@ class BatchExecSortMergeJoin(
       leftType,
       rightType)
 
-    val externalBufferMemory = config.getConfiguration.getInteger(
-      ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+    val externalBufferMemoryInMB = config.getConfiguration.getInteger(
+      ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM)
+    val externalBufferMemory = externalBufferMemoryInMB * NodeResourceUtil.SIZE_IN_MB
 
-    val sortMemory = config.getConfiguration.getInteger(
-      ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+    val sortMemoryInMB = config.getConfiguration.getInteger(
+      ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM)
+    val sortMemory = sortMemoryInMB * NodeResourceUtil.SIZE_IN_MB
 
     def newSortGen(originalKeys: Array[Int], t: RowType): SortCodeGenerator = {
       val originalOrders = originalKeys.map(_ => true)
@@ -256,13 +258,18 @@ class BatchExecSortMergeJoin(
       newSortGen(leftAllKey.indices.toArray, keyType).generateRecordComparator("KeyComparator"),
       filterNulls)
 
-    new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+    val externalBufferNum = if (flinkJoinType == FlinkJoinType.FULL) 2 else 1
+    val managedMemoryInMB = externalBufferMemoryInMB * externalBufferNum + sortMemoryInMB * 2
+    val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftInput,
       rightInput,
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
       getResource.getParallelism)
+    val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+    ret.setResources(resource, resource)
+    ret
   }
 
   private def estimateOutputSize(relNode: RelNode): Double = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
index 768b028..8a01d69 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef}
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.sinks._
@@ -124,7 +124,7 @@ class StreamExecSink[T](
             "implemented and return the sink transformation DataStreamSink. " +
             s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.")
         }
-        val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+        val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
           planner.getTableConfig.getConfiguration)
 
         val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
index 2106d0b..1b6109e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.table.plan.reuse
 
-import org.apache.flink.runtime.io.network.DataExchangeMode
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.ShuffleMode
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode, ExecNodeVisitorImpl}
@@ -70,7 +70,7 @@ import scala.collection.mutable
   *                     |
   *                 HashJoin
   *     (build side)/      \(probe side)
-  *    (broadcast)Exchange Exchange(exchange_mode=[BATCH]) add BATCH Exchange to breakup deadlock
+  *    (broadcast)Exchange Exchange(shuffle_mode=[BATCH]) add BATCH Exchange to breakup deadlock
   *                |        |
   *             Calc(b>10) Calc(b<20)
   *                 \      /
@@ -165,7 +165,7 @@ class DeadlockBreakupProcessor extends DAGProcessor {
         probeNode match {
           case e: BatchExecExchange =>
             // TODO create a cloned BatchExecExchange for PIPELINE output
-            e.setRequiredDataExchangeMode(DataExchangeMode.BATCH)
+            e.setRequiredShuffleMode(ShuffleMode.BATCH)
           case _ =>
             val probeRel = probeNode.asInstanceOf[RelNode]
             val traitSet = probeRel.getTraitSet.replace(distribution)
@@ -174,7 +174,7 @@ class DeadlockBreakupProcessor extends DAGProcessor {
               traitSet,
               probeRel,
               distribution)
-            e.setRequiredDataExchangeMode(DataExchangeMode.BATCH)
+            e.setRequiredShuffleMode(ShuffleMode.BATCH)
             // replace join node's input
             join.replaceInputNode(probeSideIndex, e)
         }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
index dfb1ddf..9b4e4f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
@@ -89,7 +89,9 @@ class BatchPlanner(
     val optimizedRelNodes = optimize(sinkRelNodes)
     val execNodes = translateToExecNodePlan(optimizedRelNodes)
     val transformations = translateToPlan(execNodes)
-    val streamGraph = BatchExecutor.generateStreamGraph(getExecEnv, transformations, "")
+    val batchExecutor = new BatchExecutor(getExecEnv)
+    batchExecutor.setTableConfig(getTableConfig)
+    val streamGraph = batchExecutor.generateStreamGraph(transformations, "")
     val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
 
     val sb = new StringBuilder
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index 3ca99ac..ef17933 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -70,6 +70,8 @@ abstract class PlannerBase(
     catalogManager: CatalogManager)
   extends Planner {
 
+  executor.asInstanceOf[ExecutorBase].setTableConfig(config)
+
   private val plannerContext: PlannerContext =
     new PlannerContext(
       config,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 52651e6..3007d92 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.executor.BatchExecutor
+import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
 import org.apache.flink.table.plan.`trait`._
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
@@ -87,7 +87,9 @@ class StreamPlanner(
     val optimizedRelNodes = optimize(sinkRelNodes)
     val execNodes = translateToExecNodePlan(optimizedRelNodes)
     val transformations = translateToPlan(execNodes)
-    val streamGraph = BatchExecutor.generateStreamGraph(getExecEnv, transformations, "")
+    val streamExecutor = new StreamExecutor(getExecEnv)
+    streamExecutor.setTableConfig(getTableConfig)
+    val streamGraph = streamExecutor.generateStreamGraph(transformations, "")
     val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
 
     val sb = new StringBuilder
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/catalog/PathResolutionTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/catalog/PathResolutionTest.xml
deleted file mode 100644
index 4de35be..0000000
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/catalog/PathResolutionTest.xml
+++ /dev/null
@@ -1,195 +0,0 @@
-<?xml version="1.0" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<Root>
-  <TestCase name="testStreamSqlPathResolution[0: simpleInDefaultPath=[sqlPath: tab1, tableApiPath: [tab1], expectedPath: [builtin, default, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[builtin, default, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[10: spaceInNames=[sqlPath: `default db`.`tab 1`, tableApiPath: [default db, tab 1], expectedPath: [builtin, default db, tab 1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM `default db`.`tab 1`]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default db, tab 1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[builtin, default db, tab 1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[1: simpleInChangedDefaultCatalog=[defaultCatalog: cat1, sqlPath: tab1, tableApiPath: [tab1], expectedPath: [cat1, db1, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[cat1, db1, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[cat1, db1, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[2: simpleInChangedDefaultPath=[defaultCatalog: cat1, defaultDatabase: db2, sqlPath: tab1, tableApiPath: [tab1], expectedPath: [cat1, db2, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[cat1, db2, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[cat1, db2, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[3: qualifiedWithDatabase=[defaultCatalog: builtin, defaultDatabase: default, sqlPath: db1.tab1, tableApiPath: [db1, tab1], expectedPath: [builtin, db1, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM db1.tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, db1, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[builtin, db1, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[4: fullyQualifiedName=[defaultCatalog: builtin, defaultDatabase: default, sqlPath: cat1.db1.tab1, tableApiPath: [cat1, db1, tab1], expectedPath: [cat1, db1, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM cat1.db1.tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[cat1, db1, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[cat1, db1, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[5: externalCatalogTopLevelTable=[sqlPath: extCat1.tab1, tableApiPath: [extCat1, tab1], expectedPath: [extCat1, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM extCat1.tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[extCat1, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[extCat1, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[6: externalCatalogMultiLevelNesting=[sqlPath: extCat1.extCat2.extCat3.tab1, tableApiPath: [extCat1, extCat2, extCat3, tab1], expectedPath: [extCat1, extCat2, extCat3, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM extCat1.extCat2.extCat3.tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[extCat1, extCat2, extCat3, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[extCat1, extCat2, extCat3, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[7: dotInUnqualifiedTableName=[sqlPath: `tab.1`, tableApiPath: [tab.1], expectedPath: [builtin, default, tab.1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM `tab.1`]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default, tab.1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[builtin, default, tab.1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[8: dotInDatabaseName=[sqlPath: `default.db`.tab1, tableApiPath: [default.db, tab1], expectedPath: [builtin, default.db, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM `default.db`.tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default.db, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[builtin, default.db, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStreamSqlPathResolution[9: dotInDefaultDatabaseName=[defaultCatalog: builtin, defaultDatabase: default.db, sqlPath: tab1, tableApiPath: [tab1], expectedPath: [builtin, default.db, tab1]]]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM tab1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default.db, tab1, source: [()]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-TableSourceScan(table=[[builtin, default.db, tab1, source: [()]]], fields=[])
-]]>
-    </Resource>
-  </TestCase>
-</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/executor/BatchExecutorTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/executor/BatchExecutorTest.xml
new file mode 100644
index 0000000..52a0f64
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/executor/BatchExecutorTest.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testRestoreConfigWhenBatchShuffleMode">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+		ship_strategy : FORWARD
+
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRestoreConfig">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+		ship_strategy : FORWARD
+
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml
index 5b83079..acd9bb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml
@@ -285,7 +285,7 @@ Calc(select=[a AS a1, b, c AS c2], reuse_id=[3])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a, b], where=[<=(a, 10)])
    :     +- Reused(reference_id=[1])
-   +- Exchange(distribution=[hash[a2]], exchange_mode=[BATCH])
+   +- Exchange(distribution=[hash[a2]], shuffle_mode=[BATCH])
       +- Calc(select=[a AS a2, c], where=[>=(b, 5)])
          +- Reused(reference_id=[2])
 
@@ -298,7 +298,7 @@ Sink(name=[sink2], fields=[a1, b, c1])
       :- Exchange(distribution=[hash[a3]])
       :  +- Calc(select=[a AS a3, c AS c1], where=[<(b, 5)])
       :     +- Reused(reference_id=[2])
-      +- Exchange(distribution=[hash[a1]], exchange_mode=[BATCH])
+      +- Exchange(distribution=[hash[a1]])
          +- Calc(select=[a1, b])
             +- Reused(reference_id=[3])
 ]]>
@@ -737,12 +737,12 @@ LogicalSink(name=[appendSink], fields=[a1, b, c1])
 Sink(name=[appendSink], fields=[a1, b, c1])
 +- Calc(select=[a1, b, c1])
    +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a3, c1, a1, b], build=[right])
-      :- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
+      :- Exchange(distribution=[hash[a3]], shuffle_mode=[BATCH])
       :  +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
       :     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
       +- Calc(select=[a AS a1, b])
          +- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
-            :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+            :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
             :  +- Calc(select=[a, b], where=[<=(a, 10)])
             :     +- Reused(reference_id=[1])
             +- Exchange(distribution=[hash[a2]])
@@ -822,17 +822,17 @@ LogicalSink(name=[sink], fields=[a, b, c])
 Sink(name=[sink], fields=[a, b, c])
 +- Calc(select=[a AS a1, b1, c1])
    +- HashJoin(joinType=[InnerJoin], where=[=(a, a3)], select=[a, a3, b1, c1], build=[right])
-      :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+      :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
       :  +- Calc(select=[a], where=[<=(a, 10)])
       :     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
       +- Calc(select=[a3, b AS b1, c1])
          +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a3, c1, a1, b], build=[right])
-            :- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
+            :- Exchange(distribution=[hash[a3]], shuffle_mode=[BATCH])
             :  +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
             :     +- Reused(reference_id=[1])
             +- Calc(select=[a AS a1, b])
                +- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
-                  :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+                  :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
                   :  +- Calc(select=[a, b], where=[<=(a, 10)])
                   :     +- Reused(reference_id=[1])
                   +- Exchange(distribution=[hash[a2]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml
index 1c4a26d..1b04b86 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml
@@ -35,7 +35,7 @@ HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
 +- HashJoin(joinType=[LeftSemiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c], build=[left])
    :- Exchange(distribution=[hash[a, b, c]])
    :  +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[a, b, c]], exchange_mode=[BATCH])
+   +- Exchange(distribution=[hash[a, b, c]], shuffle_mode=[BATCH])
       +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -57,7 +57,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
       <![CDATA[
 Calc(select=[a, b, c, a1, b0, c0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, c, a0, a1, b0, c0], build=[right])
-   :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
    :  +- Calc(select=[a, b, c, CAST(a) AS a0])
    :     +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
    +- Exchange(distribution=[hash[b]])
@@ -81,7 +81,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
 :  +- Calc(select=[a, b, c], where=[>(b, 10)])
 :     +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
 +- Exchange(distribution=[hash[a]])
@@ -114,7 +114,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
       <![CDATA[
 Calc(select=[c, a, b, c0, a1, b0])
 +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
-   :- Exchange(distribution=[any], exchange_mode=[BATCH])
+   :- Exchange(distribution=[any], shuffle_mode=[BATCH])
    :  +- Calc(select=[c, a, b, CAST(a) AS a0])
    :     +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
    :        +- Exchange(distribution=[hash[c]])
@@ -161,7 +161,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5], a1=[$6], b1=[$
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(c, c1)], select=[a, b, c, a0, b0, c0, a1, b1, c1, a00, b00, c00], build=[right])
-:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[c]])
 :  +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
 :     :- Calc(select=[a, b, c], where=[>(b, 10)])
 :     :  +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b, Final_MAX(max$1) AS c], reuse_id=[1])
@@ -170,13 +170,13 @@ HashJoin(joinType=[InnerJoin], where=[=(c, c1)], select=[a, b, c, a0, b0, c0, a1
 :     :           +- LocalSortAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0, Partial_MAX(c) AS max$1])
 :     :              +- Sort(orderBy=[a ASC])
 :     :                 +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:     +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:     +- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
 :        +- Reused(reference_id=[1])
 +- Exchange(distribution=[hash[c]])
    +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
       :- Calc(select=[a, b, c], where=[<(b, 5)])
       :  +- Reused(reference_id=[1])
-      +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+      +- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
          +- Reused(reference_id=[1])
 ]]>
     </Resource>
@@ -211,7 +211,7 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a, b])
 +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0, b], build=[right])
-   :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
    :  +- Calc(select=[a], where=[=(b, 5:BIGINT)])
    :     +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
    +- HashJoin(joinType=[LeftSemiJoin], where=[=(a, a0)], select=[a, b], build=[left])
@@ -221,7 +221,7 @@ Calc(select=[a, b])
       :        +- Exchange(distribution=[single])
       :           +- Limit(offset=[0], fetch=[10], global=[false])
       :              +- Reused(reference_id=[1])
-      +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+      +- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
          +- Calc(select=[a], where=[>(b, 5)])
             +- Reused(reference_id=[1])
 ]]>
@@ -251,7 +251,7 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a])
 +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
-   :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
    :  +- Calc(select=[a], reuse_id=[1])
    :     +- Limit(offset=[0], fetch=[10], global=[true])
    :        +- Exchange(distribution=[single])
@@ -292,7 +292,7 @@ Calc(select=[a])
    :        +- Exchange(distribution=[single])
    :           +- Limit(offset=[0], fetch=[10], global=[false])
    :              +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[any], exchange_mode=[BATCH])
+   +- Exchange(distribution=[any], shuffle_mode=[BATCH])
       +- Reused(reference_id=[1])
 ]]>
     </Resource>
@@ -324,7 +324,7 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3], a0=[$4], b0=[$5], d0=[$6], e0=[$7
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, d, e, a0, b0, d0, e0], build=[right])
-:- Exchange(distribution=[hash[b]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
 :  +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left])
 :     :- Exchange(distribution=[hash[a]])
 :     :  +- Calc(select=[a, b], where=[<(a, 10)])
@@ -367,7 +367,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(c, c0)], select=[a, b, c, a0, b0, c0], build=[right])
-:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
 :  +- Calc(select=[w0$o0 AS a, b, c])
 :     +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX($2) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[b, c, $2, w0$o0])
 :        +- Calc(select=[b, c, CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS $2], reuse_id=[1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
index c514ad8..5a80315 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -227,7 +227,7 @@ LogicalMinus(all=[false])
       <![CDATA[
 HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
 +- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c], build=[left])
-   :- Exchange(distribution=[hash[a, b, c]], exchange_mode=[BATCH], reuse_id=[1])
+   :- Exchange(distribution=[hash[a, b, c]], shuffle_mode=[BATCH], reuse_id=[1])
    :  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
    +- Reused(reference_id=[1])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index 141cbdf..9629fb3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -40,7 +40,7 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a])
 +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
-   :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
    :  +- Calc(select=[a], reuse_id=[1])
    :     +- Limit(offset=[0], fetch=[10], global=[true])
    :        +- Exchange(distribution=[single])
@@ -81,7 +81,7 @@ Calc(select=[a])
    :        +- Exchange(distribution=[single])
    :           +- Limit(offset=[0], fetch=[10], global=[false])
    :              +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[any], exchange_mode=[BATCH])
+   +- Exchange(distribution=[any], shuffle_mode=[BATCH])
       +- Reused(reference_id=[1])
 ]]>
     </Resource>
@@ -157,7 +157,7 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3], a0=[$4], b0=[$5], d0=[$6], e0=[$7
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, d, e, a0, b0, d0, e0], build=[right])
-:- Exchange(distribution=[hash[b]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
 :  +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left])
 :     :- Exchange(distribution=[hash[a]])
 :     :  +- Calc(select=[a, b], where=[<(a, 10)])
@@ -257,10 +257,10 @@ Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS nsum, avg_b0 AS avg_b
    :     :                          +- Exchange(distribution=[hash[d]])
    :     :                             +- Calc(select=[d, e], where=[>(e, 10)])
    :     :                                +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
-   :     +- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
+   :     +- Exchange(distribution=[hash[c, e, $f5]], shuffle_mode=[BATCH])
    :        +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, c, e, +(w1$o0, 1) AS $f5])
    :           +- Reused(reference_id=[1])
-   +- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
+   +- Exchange(distribution=[hash[c, e, $f5]], shuffle_mode=[BATCH])
       +- Calc(select=[sum_b, c, e, -(w0$o0, 1) AS $f5])
          +- OverAggregate(partitionBy=[c, e], orderBy=[c ASC, e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0])
             +- Reused(reference_id=[2])
@@ -291,7 +291,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
       <![CDATA[
 Calc(select=[c, a, b, c0, a1, b0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
-   :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
    :  +- Calc(select=[c, a, b, CAST(a) AS a0])
    :     +- SortAggregate(isMerge=[false], groupBy=[c], select=[c, MyFirst(a) AS a, MyLast(b) AS b], reuse_id=[1])
    :        +- Sort(orderBy=[c ASC])
@@ -492,7 +492,7 @@ LogicalProject(a=[$0], b=[$1], a0=[$2])
       <![CDATA[
 Calc(select=[a, b, a1 AS a0])
 +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, a0, a1, b0], build=[right])
-   :- Exchange(distribution=[any], exchange_mode=[BATCH])
+   :- Exchange(distribution=[any], shuffle_mode=[BATCH])
    :  +- Calc(select=[a, b, CAST(a) AS a0])
    :     +- Limit(offset=[0], fetch=[10], global=[true], reuse_id=[1])
    :        +- Exchange(distribution=[single])
@@ -603,7 +603,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
       <![CDATA[
 Calc(select=[c, a, b, c0, a1, b0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
-   :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
    :  +- Calc(select=[c, a, b, CAST(a) AS a0])
    :     +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
    :        +- Exchange(distribution=[hash[c]])
@@ -642,7 +642,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f, a0, b0, c0, d0, e0, f0], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
 :  +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left], reuse_id=[1])
 :     :- Exchange(distribution=[hash[a]])
 :     :  +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
@@ -682,7 +682,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
       <![CDATA[
 Calc(select=[a, b, c, d, e, f, a1, b0, c0, d0, e0, f0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, c, d, e, f, a0, a1, b0, c0, d0, e0, f0], build=[right])
-   :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
    :  +- Calc(select=[a, b, c, d, e, f, CAST(a) AS a0])
    :     +- NestedLoopJoin(joinType=[InnerJoin], where=[OR(=(ABS(a), ABS(d)), =(c, f))], select=[a, b, c, d, e, f], build=[left], reuse_id=[1])
    :        :- Exchange(distribution=[broadcast])
@@ -724,7 +724,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
       <![CDATA[
 Calc(select=[a, b, c, d, e, f, a1, b0, c0, d0, e0, f0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, c, d, e, f, a0, a1, b0, c0, d0, e0, f0], build=[right])
-   :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
    :  +- Calc(select=[a, b, c, d, e, f, CAST(a) AS a0])
    :     +- NestedLoopJoin(joinType=[InnerJoin], where=[OR(=(random_udf(a), random_udf(d)), =(c, f))], select=[a, b, c, d, e, f], build=[left], reuse_id=[1])
    :        :- Exchange(distribution=[broadcast])
@@ -761,7 +761,7 @@ LogicalProject(a=[$0], b=[$1], a0=[$2])
       <![CDATA[
 Calc(select=[a, b, a1 AS a0])
 +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, a0, a1, b0], build=[right])
-   :- Exchange(distribution=[any], exchange_mode=[BATCH])
+   :- Exchange(distribution=[any], shuffle_mode=[BATCH])
    :  +- Calc(select=[a, b, CAST(a) AS a0])
    :     +- Limit(offset=[0], fetch=[10], global=[true], reuse_id=[1])
    :        +- Exchange(distribution=[single])
@@ -806,7 +806,7 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f,
 :     :  +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
 :     :     +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 :     +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
-+- Exchange(distribution=[any], exchange_mode=[BATCH])
++- Exchange(distribution=[any], shuffle_mode=[BATCH])
    +- Reused(reference_id=[1])
 ]]>
     </Resource>
@@ -832,7 +832,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5])
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, $2, a0, b0, $20], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
 :  +- Calc(select=[a, b, w0$o0 AS $2], where=[<(b, 100)])
 :     +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window#0=[MyFirst(c) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0], reuse_id=[1])
 :        +- Sort(orderBy=[c DESC])
@@ -867,7 +867,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[c, a, b, c0, a0, b0], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
 :  +- Calc(select=[c, a, b], where=[>(a, 1)])
 :     +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
 :        +- Exchange(distribution=[hash[c]])
@@ -903,7 +903,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
       <![CDATA[
 Calc(select=[c, a, b, c0, a1, b0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
-   :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+   :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
    :  +- Calc(select=[c, a, b, CAST(a) AS a0])
    :     +- SortAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
    :        +- Sort(orderBy=[c ASC])
@@ -938,7 +938,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5])
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, $2, a0, b0, $20], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
 :  +- Calc(select=[a, b, w0$o0 AS $2], where=[<(b, 100)])
 :     +- OverAggregate(orderBy=[c DESC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0], reuse_id=[1])
 :        +- Sort(orderBy=[c DESC])
@@ -984,7 +984,7 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[c, a, b, c0, a0,
 :                 +- Exchange(distribution=[hash[c]])
 :                    +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS sum$0, Partial_SUM(b) AS sum$1])
 :                       +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[any], exchange_mode=[BATCH])
++- Exchange(distribution=[any], shuffle_mode=[BATCH])
    +- Calc(select=[c, a, b], where=[<(b, 10)])
       +- Reused(reference_id=[1])
 ]]>
@@ -1022,7 +1022,7 @@ LogicalProject(a=[$0], c=[$1], c0=[$3])
       <![CDATA[
 Calc(select=[a, c, c0])
 +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, a0, c0], build=[right])
-   :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH], reuse_id=[1])
+   :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH], reuse_id=[1])
    :  +- Union(all=[true], union=[a, c])
    :     :- Calc(select=[a, c], where=[>(b, 10)])
    :     :  +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -1095,7 +1095,7 @@ NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, rand
 :  +- Sort(orderBy=[random ASC])
 :     +- Exchange(distribution=[hash[random]])
 :        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
-:           :- Exchange(distribution=[any], exchange_mode=[BATCH])
+:           :- Exchange(distribution=[any], shuffle_mode=[BATCH])
 :           :  +- Calc(select=[random], reuse_id=[1])
 :           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
 :           :        +- Exchange(distribution=[single])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index 8eb5180..202533c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -327,7 +327,7 @@ Calc(select=[EXPR$0, EXPR$1, EXPR$2])
    +- Exchange(distribution=[hash[b, b0]])
       +- LocalHashAggregate(groupBy=[b, b0], select=[b, b0, Partial_COUNT(a) AS count$0, Partial_COUNT(id) AS count$1, Partial_SUM(a0) AS sum$2])
          +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[b, a, id, a0, b0], build=[right])
-            :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+            :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
             :  +- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[a=id], where=[>(age, 10)], select=[b, a, id], reuse_id=[1])
             :     +- Calc(select=[b, a])
             :        +- HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml
index 4cc5310..a528c3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml
@@ -35,7 +35,7 @@ LogicalProject(a1=[$0], a2=[$1])
       <![CDATA[
 Calc(select=[a1, a2])
 +- NestedLoopJoin(joinType=[InnerJoin], where=[<(a1, cnt)], select=[a1, a2, cnt], build=[right], singleRowJoin=[true])
-   :- Exchange(distribution=[any], exchange_mode=[BATCH])
+   :- Exchange(distribution=[any], shuffle_mode=[BATCH])
    :  +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1, a2], reuse_id=[1])
    +- Exchange(distribution=[broadcast])
       +- HashAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS cnt])
@@ -147,7 +147,7 @@ LogicalProject(a1=[$0], a_sum=[$2])
     <Resource name="planAfter">
       <![CDATA[
 NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, a_sum], build=[right], singleRowJoin=[true])
-:- Exchange(distribution=[any], exchange_mode=[BATCH])
+:- Exchange(distribution=[any], shuffle_mode=[BATCH])
 :  +- Calc(select=[a1])
 :     +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1, a2], reuse_id=[1])
 +- Exchange(distribution=[broadcast])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml
index ee628f4..0d88364 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml
@@ -87,7 +87,7 @@ LogicalProject(a1=[AS($0, _UTF-16LE'a1')])
     <Resource name="planAfter">
       <![CDATA[
 HashJoin(joinType=[LeftSemiJoin], where=[=(c, a1)], select=[a, b, c], build=[right], tryDistinctBuildRow=[true])
-:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
 :  +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
 +- Exchange(distribution=[hash[a1]])
    +- LocalHashAggregate(groupBy=[a1], select=[a1])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
new file mode 100644
index 0000000..cca8e1a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.executor
+
+import org.apache.flink.api.common.InputDependencyConstraint
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.transformations.ShuffleMode
+import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
+
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+/**
+  * Test for streamEnv config save and restore when run batch jobs.
+  */
+class BatchExecutorTest extends TableTestBase {
+
+  private var util: BatchTableTestUtil = _
+
+  @Before
+  def setUp(): Unit = {
+    util = batchTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+    util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testRestoreConfig(): Unit = {
+    util.getStreamEnv.setBufferTimeout(11)
+    util.getStreamEnv.getConfig.disableObjectReuse()
+    util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
+    util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
+    util.verifyExplain("SELECT * FROM MyTable")
+    assertEquals(11, util.getStreamEnv.getBufferTimeout)
+    assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
+    assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
+    assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
+    assertEquals(InputDependencyConstraint.ANY,
+      util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
+  }
+
+  @Test
+  def testRestoreConfigWhenBatchShuffleMode(): Unit = {
+    util.getTableEnv.getConfig.getConfiguration.setString(
+      ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
+      ShuffleMode.BATCH.toString)
+    util.getStreamEnv.setBufferTimeout(11)
+    util.getStreamEnv.getConfig.disableObjectReuse()
+    util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
+    util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
+    util.verifyExplain("SELECT * FROM MyTable")
+    assertEquals(11, util.getStreamEnv.getBufferTimeout)
+    assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
+    assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
+    assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
+    assertEquals(InputDependencyConstraint.ANY,
+      util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index fd2e489..3e8d304 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.transformations.ShuffleMode
 import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
 import org.apache.flink.table.api.{EnvironmentSettings, ExecutionConfigOptions, SqlParserException, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter}
@@ -72,6 +73,8 @@ class BatchTestBase extends BatchAbstractTestBase {
     conf.getConfiguration.setInteger(ExecutionConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM, 2)
     conf.getConfiguration.setInteger(ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM, 1)
     conf.getConfiguration.setInteger(ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM, 1)
+    conf.getConfiguration.setString(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
+      ShuffleMode.PIPELINED.toString)
   }
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index fb9dfd5..26a5268 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
+import org.apache.flink.streaming.api.transformations.ShuffleMode
 import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
@@ -483,12 +484,16 @@ abstract class TableTestUtil(
   protected val testingTableEnv: TestingTableEnvironment =
     TestingTableEnvironment.create(setting, catalogManager)
   val tableEnv: TableEnvironment = testingTableEnv
+  tableEnv.getConfig.getConfiguration.setString(
+    ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString)
 
   private val env: StreamExecutionEnvironment = getPlanner.getExecEnv
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
   override def getTableEnv: TableEnvironment = tableEnv
 
+  def getStreamEnv: StreamExecutionEnvironment = env
+
   /**
     * Create a [[TestTableSource]] with the given schema, table stats and unique keys,
     * and registers this TableSource under given name into the TableEnvironment's catalog.
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
index 6c8cce3..283625c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
@@ -107,12 +107,12 @@ public class ExecutionConfigOptions {
 
 	public static final ConfigOption<Integer> SQL_RESOURCE_HASH_AGG_TABLE_MEM =
 			key("sql.resource.hash-agg.table.memory.mb")
-					.defaultValue(256)
+					.defaultValue(128)
 					.withDescription("Sets the table memory size of hashAgg operator.");
 
 	public static final ConfigOption<Integer> SQL_RESOURCE_HASH_JOIN_TABLE_MEM =
 			key("sql.resource.hash-join.table.memory.mb")
-					.defaultValue(512)
+					.defaultValue(128)
 					.withDescription("Sets the HashTable reserved memory for hashJoin operator. It defines the lower limit.");
 
 	public static final ConfigOption<Integer> SQL_RESOURCE_SORT_BUFFER_MEM =
@@ -195,4 +195,12 @@ public class ExecutionConfigOptions {
 							"means a kind of disabled operator. Its default value is empty that means no operators are disabled. " +
 							"If the configure's value is \"NestedLoopJoin, ShuffleHashJoin\", NestedLoopJoin and ShuffleHashJoin " +
 							"are disabled. If configure's value is \"HashJoin\", ShuffleHashJoin and BroadcastHashJoin are disabled.");
+
+	public static final ConfigOption<String> SQL_EXEC_SHUFFLE_MODE =
+			key("sql.exec.shuffle-mode")
+					.defaultValue("batch")
+					.withDescription("Sets exec shuffle mode. Only batch or pipeline can be set.\n" +
+							"batch: the job will run stage by stage. \n" +
+							"pipeline: the job will run in streaming mode, but it may cause resource deadlock that receiver waits for resource to start when " +
+							"the sender holds resource to wait to send data to the receiver.");
 }