You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/12 02:07:37 UTC
[flink] branch master updated: [FLINK-13121][table-planner-blink]
Set properties to StreamGraph in blink executor
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a9336f9 [FLINK-13121][table-planner-blink] Set properties to StreamGraph in blink executor
a9336f9 is described below
commit a9336f962c143864e1f326a9ba59df7da2839be0
Author: Xupingyong <xu...@163.com>
AuthorDate: Wed Jul 10 14:38:04 2019 +0800
[FLINK-13121][table-planner-blink] Set properties to StreamGraph in blink executor
This closes #9057
---
.../apache/flink/table/executor/BatchExecutor.java | 127 ++++++++++----
.../apache/flink/table/executor/ExecutorBase.java | 11 ++
.../flink/table/executor/StreamExecutor.java | 20 +--
...deResourceConfig.java => NodeResourceUtil.java} | 9 +-
.../ShuffleStageParallelismCalculator.java | 6 +-
.../codegen/NestedLoopJoinCodeGenerator.scala | 4 +-
.../nodes/physical/batch/BatchExecExchange.scala | 94 +++-------
.../batch/BatchExecHashAggregateBase.scala | 15 +-
.../nodes/physical/batch/BatchExecHashJoin.scala | 16 +-
.../batch/BatchExecHashWindowAggregateBase.scala | 14 +-
.../physical/batch/BatchExecNestedLoopJoin.scala | 14 +-
.../physical/batch/BatchExecOverAggregate.scala | 13 +-
.../plan/nodes/physical/batch/BatchExecSink.scala | 4 +-
.../plan/nodes/physical/batch/BatchExecSort.scala | 14 +-
.../physical/batch/BatchExecSortMergeJoin.scala | 19 +-
.../nodes/physical/stream/StreamExecSink.scala | 4 +-
.../plan/reuse/DeadlockBreakupProcessor.scala | 8 +-
.../apache/flink/table/planner/BatchPlanner.scala | 4 +-
.../apache/flink/table/planner/PlannerBase.scala | 2 +
.../apache/flink/table/planner/StreamPlanner.scala | 6 +-
.../flink/table/catalog/PathResolutionTest.xml | 195 ---------------------
.../flink/table/executor/BatchExecutorTest.xml | 59 +++++++
.../table/plan/batch/sql/DagOptimizationTest.xml | 14 +-
.../table/plan/batch/sql/DeadlockBreakupTest.xml | 26 +--
.../table/plan/batch/sql/SetOperatorsTest.xml | 2 +-
.../table/plan/batch/sql/SubplanReuseTest.xml | 40 ++---
.../table/plan/batch/sql/join/LookupJoinTest.xml | 2 +-
.../plan/batch/sql/join/SingleRowJoinTest.xml | 4 +-
.../table/plan/batch/table/SetOperatorsTest.xml | 2 +-
.../flink/table/executor/BatchExecutorTest.scala | 81 +++++++++
.../flink/table/runtime/utils/BatchTestBase.scala | 3 +
.../apache/flink/table/util/TableTestBase.scala | 5 +
.../flink/table/api/ExecutionConfigOptions.java | 12 +-
33 files changed, 450 insertions(+), 399 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index b1dfbce..8a90b80 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -21,17 +21,18 @@ package org.apache.flink.table.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.api.ExecutionConfigOptions;
import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil;
import java.util.List;
@@ -42,6 +43,8 @@ import java.util.List;
@Internal
public class BatchExecutor extends ExecutorBase {
+ private BatchExecEnvConfig batchExecEnvConfig = new BatchExecEnvConfig();
+
@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
@@ -49,40 +52,100 @@ public class BatchExecutor extends ExecutorBase {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- if (transformations.isEmpty()) {
- throw new TableException("No table sinks have been created yet. " +
- "A program needs at least one sink that consumes data. ");
- }
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- StreamGraph streamGraph = generateStreamGraph(execEnv, transformations, getNonEmptyJobName(jobName));
-
- // TODO supports streamEnv.execute(streamGraph)
- try {
- return execEnv.execute(getNonEmptyJobName(jobName));
- } finally {
- transformations.clear();
- }
+ StreamGraph streamGraph = generateStreamGraph(transformations, jobName);
+ return execEnv.execute(streamGraph);
}
- public static StreamGraph generateStreamGraph(
- StreamExecutionEnvironment execEnv,
- List<Transformation<?>> transformations,
- String jobName) throws Exception {
- // TODO avoid cloning ExecutionConfig
- ExecutionConfig executionConfig = InstantiationUtil.clone(execEnv.getConfig());
+ /**
+ * Backup previous streamEnv config and set batch configs.
+ */
+ private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) {
+ batchExecEnvConfig.backup(execEnv);
+ ExecutionConfig executionConfig = execEnv.getConfig();
executionConfig.enableObjectReuse();
executionConfig.setLatencyTrackingInterval(-1);
+ execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ execEnv.setBufferTimeout(-1);
+ if (isShuffleModeAllBatch()) {
+ executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
+ }
+ }
- return new StreamGraphGenerator(transformations, executionConfig, new CheckpointConfig())
- .setChaining(execEnv.isChainingEnabled())
- .setStateBackend(execEnv.getStateBackend())
- .setDefaultBufferTimeout(-1)
- .setTimeCharacteristic(TimeCharacteristic.ProcessingTime)
- .setUserArtifacts(execEnv.getCachedFiles())
- .setSlotSharingEnabled(false)
- .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES)
- .setJobName(jobName)
- .generate();
+ /**
+ * Translates transformationList to streamGraph.
+ */
+ public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
+ StreamExecutionEnvironment execEnv = getExecutionEnvironment();
+ backupAndUpdateStreamEnv(execEnv);
+ transformations.forEach(execEnv::addOperator);
+ StreamGraph streamGraph;
+ streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
+ // All transformations should set managed memory size.
+ ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
+ streamGraph.getStreamNodes().forEach(sn -> {
+ sn.setResources(sn.getMinResources().merge(managedResourceSpec), sn.getPreferredResources().merge(managedResourceSpec));
+ });
+ streamGraph.setChaining(true);
+ streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+ streamGraph.setStateBackend(null);
+ streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+ if (isShuffleModeAllBatch()) {
+ streamGraph.setBlockingConnectionsBetweenChains(true);
+ }
+ batchExecEnvConfig.restore(execEnv);
+ return streamGraph;
}
+ private boolean isShuffleModeAllBatch() {
+ String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE);
+ if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
+ return true;
+ } else if (!value.equalsIgnoreCase(ShuffleMode.PIPELINED.toString())) {
+ throw new IllegalArgumentException(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE.key() +
+ " can only be set to " + ShuffleMode.BATCH.toString() + " or " + ShuffleMode.PIPELINED.toString());
+ }
+ return false;
+ }
+
+ /**
+ * Batch configs that are set in {@link StreamExecutionEnvironment}. We should backup and change
+ * these configs and restore finally.
+ */
+ private static class BatchExecEnvConfig {
+
+ private boolean enableObjectReuse;
+ private long latencyTrackingInterval;
+ private long bufferTimeout;
+ private TimeCharacteristic timeCharacteristic;
+ private InputDependencyConstraint inputDependencyConstraint;
+
+ /**
+ * Backup previous streamEnv config.
+ */
+ public void backup(StreamExecutionEnvironment execEnv) {
+ ExecutionConfig executionConfig = execEnv.getConfig();
+ enableObjectReuse = executionConfig.isObjectReuseEnabled();
+ latencyTrackingInterval = executionConfig.getLatencyTrackingInterval();
+ timeCharacteristic = execEnv.getStreamTimeCharacteristic();
+ bufferTimeout = execEnv.getBufferTimeout();
+ inputDependencyConstraint = executionConfig.getDefaultInputDependencyConstraint();
+ }
+
+ /**
+ * Restore previous streamEnv after execute batch jobs.
+ */
+ public void restore(StreamExecutionEnvironment execEnv) {
+ ExecutionConfig executionConfig = execEnv.getConfig();
+ if (enableObjectReuse) {
+ executionConfig.enableObjectReuse();
+ } else {
+ executionConfig.disableObjectReuse();
+ }
+ executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
+ execEnv.setStreamTimeCharacteristic(timeCharacteristic);
+ execEnv.setBufferTimeout(bufferTimeout);
+ executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java
index fb6bbf1..4d67547 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/ExecutorBase.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.util.StringUtils;
@@ -37,11 +39,16 @@ public abstract class ExecutorBase implements Executor {
private final StreamExecutionEnvironment executionEnvironment;
protected List<Transformation<?>> transformations = new ArrayList<>();
+ protected TableConfig tableConfig;
public ExecutorBase(StreamExecutionEnvironment executionEnvironment) {
this.executionEnvironment = executionEnvironment;
}
+ public void setTableConfig(TableConfig tableConfig) {
+ this.tableConfig = tableConfig;
+ }
+
@Override
public void apply(List<Transformation<?>> transformations) {
this.transformations.addAll(transformations);
@@ -51,6 +58,10 @@ public abstract class ExecutorBase implements Executor {
return executionEnvironment;
}
+ public abstract StreamGraph generateStreamGraph(
+ List<Transformation<?>> transformations,
+ String jobName) throws Exception;
+
protected String getNonEmptyJobName(String jobName) {
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
return DEFAULT_JOB_NAME;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
index 3863e12..9df4ea3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.delegation.Executor;
import java.util.List;
@@ -40,19 +41,16 @@ public class StreamExecutor extends ExecutorBase {
}
@Override
- public void apply(List<Transformation<?>> transformations) {
- transformations.forEach(getExecutionEnvironment()::addOperator);
- }
-
- @Override
public JobExecutionResult execute(String jobName) throws Exception {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- try {
- transformations.forEach(execEnv::addOperator);
- return execEnv.execute(getNonEmptyJobName(jobName));
- } finally {
- transformations.clear();
- }
+ return execEnv.execute(generateStreamGraph(transformations, jobName));
+ }
+
+ public StreamGraph generateStreamGraph(
+ List<Transformation<?>> transformations,
+ String jobName) throws Exception {
+ transformations.forEach(getExecutionEnvironment()::addOperator);
+ return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceConfig.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
similarity index 89%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceConfig.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
index 91e3c8c..1f3fc35 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceConfig.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
@@ -18,13 +18,14 @@
package org.apache.flink.table.plan.nodes.resource;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ExecutionConfigOptions;
/**
* Deal with resource config for {@link org.apache.flink.table.plan.nodes.exec.ExecNode}.
*/
-public class NodeResourceConfig {
+public class NodeResourceUtil {
/**
* How many Bytes per MB.
@@ -67,4 +68,10 @@ public class NodeResourceConfig {
}
return parallelism;
}
+
+ public static ResourceSpec fromManagedMem(int managedMem) {
+ ResourceSpec.Builder builder = ResourceSpec.newBuilder();
+ builder.setManagedMemoryInMB(managedMem);
+ return builder.build();
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
index 2501e49..31cb733 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculator.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig;
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +68,7 @@ public class ShuffleStageParallelismCalculator {
int maxParallelism = shuffleStage.getMaxParallelism();
for (ExecNode<?, ?> node : nodeSet) {
if (node instanceof BatchExecTableSourceScan || node instanceof StreamExecTableSourceScan) {
- int result = NodeResourceConfig.getSourceParallelism(tableConf, envParallelism);
+ int result = NodeResourceUtil.getSourceParallelism(tableConf, envParallelism);
if (result > sourceParallelism) {
sourceParallelism = result;
}
@@ -78,7 +78,7 @@ public class ShuffleStageParallelismCalculator {
if (sourceParallelism > 0) {
shuffleStageParallelism = sourceParallelism;
} else {
- shuffleStageParallelism = NodeResourceConfig.getOperatorDefaultParallelism(getTableConf(), envParallelism);
+ shuffleStageParallelism = NodeResourceUtil.getOperatorDefaultParallelism(getTableConf(), envParallelism);
}
if (shuffleStageParallelism > maxParallelism) {
shuffleStageParallelism = maxParallelism;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
index e5ce158..c2000f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.ExecutionConfigOptions
import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, BINARY_ROW, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM, className, newName}
import org.apache.flink.table.codegen.OperatorCodeGenerator.{INPUT_SELECTION, generateCollect}
import org.apache.flink.table.dataformat.{BaseRow, JoinedRow}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.runtime.CodeGenOperatorFactory
import org.apache.flink.table.runtime.join.FlinkJoinType
import org.apache.flink.table.runtime.util.ResettableExternalBuffer
@@ -71,7 +71,7 @@ class NestedLoopJoinCodeGenerator(
val isBinaryRow = newName("isBinaryRow")
val externalBufferMemorySize = config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+ ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceUtil.SIZE_IN_MB
if (singleRowJoin) {
ctx.addReusableMember(s"$BASE_ROW $buildRow = null;")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
index 015be56..f2170cb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
@@ -20,15 +20,16 @@ package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.io.network.DataExchangeMode
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.{PartitionTransformation, ShuffleMode}
import org.apache.flink.streaming.runtime.partitioner.{BroadcastPartitioner, GlobalPartitioner, RebalancePartitioner}
+import org.apache.flink.table.api.ExecutionConfigOptions
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{CodeGeneratorContext, HashCodeGenerator}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.common.CommonPhysicalExchange
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.planner.BatchPlanner
import org.apache.flink.table.runtime.BinaryHashPartitioner
import org.apache.flink.table.types.logical.RowType
@@ -44,50 +45,8 @@ import scala.collection.JavaConversions._
/**
* This RelNode represents a change of partitioning of the input elements.
*
- * This does not create a physical transformation If its relDistribution' type is not range,
- * it only affects how upstream operations are connected to downstream operations.
- *
- * But if the type is range, this relNode will create some physical transformation because it
- * need calculate the data distribution. To calculate the data distribution, the received stream
- * will split in two process stream. For the first process stream, it will go through the sample
- * and statistics to calculate the data distribution in pipeline mode. For the second process
- * stream will been bocked. After the first process stream has been calculated successfully,
- * then the two process stream will union together. Thus it can partitioner the record based
- * the data distribution. Then The RelNode will create the following transformations.
- *
- * +---------------------------------------------------------------------------------------------+
- * | |
- * | +-----------------------------+ |
- * | | Transformation | ------------------------------------> |
- * | +-----------------------------+ | |
- * | | | |
- * | | | |
- * | |forward & PIPELINED | |
- * | \|/ | |
- * | +--------------------------------------------+ | |
- * | | OneInputTransformation[LocalSample, n] | | |
- * | +--------------------------------------------+ | |
- * | | |forward & BATCH |
- * | |forward & PIPELINED | |
- * | \|/ | |
- * | +--------------------------------------------------+ | |
- * | |OneInputTransformation[SampleAndHistogram, 1] | | |
- * | +--------------------------------------------------+ | |
- * | | | |
- * | |broadcast & PIPELINED | |
- * | | | |
- * | \|/ \|/ |
- * | +---------------------------------------------------+------------------------------+ |
- * | | TwoInputTransformation[AssignRangeId, n] | |
- * | +----------------------------------------------------+-----------------------------+ |
- * | | |
- * | |custom & PIPELINED |
- * | \|/ |
- * | +---------------------------------------------------+------------------------------+ |
- * | | OneInputTransformation[RemoveRangeId, n] | |
- * | +----------------------------------------------------+-----------------------------+ |
- * | |
- * +---------------------------------------------------------------------------------------------+
+ * This does not create a physical transformation if its relDistribution' type is not range which
+ * is not supported now.
*/
class BatchExecExchange(
cluster: RelOptCluster,
@@ -103,9 +62,9 @@ class BatchExecExchange(
// and different PartitionTransformation objects will be created which have same input.
// cache input transformation to reuse
private var reusedInput: Option[Transformation[BaseRow]] = None
- // the required exchange mode for reusable ExchangeBatchExec
- // if it's None, use value from getDataExchangeMode
- private var requiredExchangeMode: Option[DataExchangeMode] = None
+ // the required shuffle mode for reusable ExchangeBatchExec
+ // if it's None, use value from getShuffleMode
+ private var requiredShuffleMode: Option[ShuffleMode] = None
override def copy(
traitSet: RelTraitSet,
@@ -116,25 +75,36 @@ class BatchExecExchange(
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
- .itemIf("exchange_mode", requiredExchangeMode.orNull,
- requiredExchangeMode.contains(DataExchangeMode.BATCH))
+ .itemIf("shuffle_mode", requiredShuffleMode.orNull,
+ requiredShuffleMode.contains(ShuffleMode.BATCH))
}
//~ ExecNode methods -----------------------------------------------------------
- def setRequiredDataExchangeMode(exchangeMode: DataExchangeMode): Unit = {
- require(exchangeMode != null)
- requiredExchangeMode = Some(exchangeMode)
+ def setRequiredShuffleMode(shuffleMode: ShuffleMode): Unit = {
+ require(shuffleMode != null)
+ requiredShuffleMode = Some(shuffleMode)
}
- private[flink] def getDataExchangeMode(tableConf: Configuration): DataExchangeMode = {
- requiredExchangeMode match {
- case Some(mode) if mode eq DataExchangeMode.BATCH => mode
- case _ => DataExchangeMode.PIPELINED
+ private[flink] def getShuffleMode(tableConf: Configuration): ShuffleMode = {
+ requiredShuffleMode match {
+ case Some(mode) if mode eq ShuffleMode.BATCH => mode
+ case _ =>
+ if (tableConf.getString(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE)
+ .equalsIgnoreCase(ShuffleMode.BATCH.toString)) {
+ ShuffleMode.BATCH
+ } else {
+ ShuffleMode.UNDEFINED
+ }
}
}
override def getDamBehavior: DamBehavior = {
+ val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+ val shuffleMode = getShuffleMode(tableConfig.getConfiguration)
+ if (shuffleMode eq ShuffleMode.BATCH) {
+ return DamBehavior.FULL_DAM
+ }
distribution.getType match {
case RelDistribution.Type.RANGE_DISTRIBUTED => DamBehavior.FULL_DAM
case _ => DamBehavior.PIPELINED
@@ -164,14 +134,8 @@ class BatchExecExchange(
val inputType = input.getOutputType.asInstanceOf[BaseRowTypeInfo]
val outputRowType = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
- val shuffleMode = requiredExchangeMode match {
- case None => ShuffleMode.PIPELINED
- case Some(mode) =>
- mode match {
- case DataExchangeMode.BATCH => ShuffleMode.BATCH
- case DataExchangeMode.PIPELINED => ShuffleMode.PIPELINED
- }
- }
+ val conf = planner.getTableConfig
+ val shuffleMode = getShuffleMode(conf.getConfiguration)
relDistribution.getType match {
case RelDistribution.Type.ANY =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index 7734ada..e395f63 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.cost.FlinkCost._
import org.apache.flink.table.plan.cost.FlinkCostFactory
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.FlinkRelMdUtil
import org.apache.flink.table.planner.BatchPlanner
@@ -131,22 +131,27 @@ abstract class BatchExecHashAggregateBase(
val aggInfos = transformToBatchAggregateInfoList(
aggCallToAggFunction.map(_._1), aggInputRowType)
+ var managedMemoryInMB = 0
val generatedOperator = if (grouping.isEmpty) {
AggWithoutKeysCodeGenerator.genWithoutKeys(
ctx, relBuilder, aggInfos, inputType, outputType, isMerge, isFinal, "NoGrouping")
} else {
- val reservedManagedMem = config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * NodeResourceConfig.SIZE_IN_MB
+ managedMemoryInMB = config.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM)
+ val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
new HashAggCodeGenerator(
ctx, relBuilder, aggInfos, inputType, outputType, grouping, auxGrouping, isMerge, isFinal
- ).genWithKeys(reservedManagedMem)
+ ).genWithKeys(managedMemory)
}
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
- new OneInputTransformation(
+ val ret = new OneInputTransformation(
input,
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
getResource.getParallelism)
+ val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+ ret.setResources(resource, resource)
+ ret
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
index 0e64fcf..c250c20 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.exec.ExecNode
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.nodes.{ExpressionFormat, FlinkConventions}
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, JoinUtil}
import org.apache.flink.table.planner.BatchPlanner
@@ -201,8 +201,9 @@ class BatchExecHashJoin(
val rType = rInput.getOutputType.asInstanceOf[BaseRowTypeInfo].toRowType
val keyType = RowType.of(leftKeys.map(lType.getChildren().get(_)): _*)
- val managedMemorySize = config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM) * NodeResourceConfig.SIZE_IN_MB
+ val managedMemoryInMB = config.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM)
+ val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
val condFunc = JoinUtil.generateConditionFunction(
config, cluster.getRexBuilder, getJoinInfo, lType, rType)
@@ -234,7 +235,7 @@ class BatchExecHashJoin(
pType,
buildKeys,
probeKeys,
- managedMemorySize,
+ managedMemory,
0,
0,
buildRowSize,
@@ -243,7 +244,7 @@ class BatchExecHashJoin(
condFunc)
} else {
SimpleOperatorFactory.of(HashJoinOperator.newHashJoinOperator(
- managedMemorySize,
+ managedMemory,
0,
0,
hashJoinType,
@@ -260,13 +261,16 @@ class BatchExecHashJoin(
))
}
- new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+ val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
build,
probe,
getOperatorName,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
getResource.getParallelism)
+ val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+ ret.setResources(resource, resource)
+ ret
}
private def getOperatorName: String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 341fe0b..e9d49ad 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.logical.LogicalWindow
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.FlinkRelMdUtil
import org.apache.flink.table.planner.BatchPlanner
@@ -135,21 +135,25 @@ abstract class BatchExecHashWindowAggregateBase(
val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window)
- val reservedManagedMem = config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * NodeResourceConfig.SIZE_IN_MB
+ val managedMemoryInMB = config.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM)
+ val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
val generatedOperator = new HashWindowCodeGenerator(
ctx, relBuilder, window, inputTimeFieldIndex,
inputTimeIsDate, namedProperties,
aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, isMerge, isFinal).gen(
- inputType, outputType, groupBufferLimitSize, reservedManagedMem, 0,
+ inputType, outputType, groupBufferLimitSize, managedMemory, 0,
windowSize, slideSize)
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
- new OneInputTransformation(
+ val ret = new OneInputTransformation(
input,
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
getResource.getParallelism)
+ val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+ ret.setResources(resource, resource)
+ ret
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index 6da6623..229bebc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -21,12 +21,14 @@ package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.ExecutionConfigOptions
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{CodeGeneratorContext, NestedLoopJoinCodeGenerator}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.ExpressionFormat
import org.apache.flink.table.plan.nodes.exec.ExecNode
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.planner.BatchPlanner
import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryRowSerializer}
@@ -150,13 +152,23 @@ class BatchExecNestedLoopJoin(
condition
).gen()
- new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+ val externalBufferMemoryInMb: Int = if (singleRowJoin) {
+ 0
+ } else {
+ planner.getTableConfig.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM)
+ }
+ val resourceSpec = NodeResourceUtil.fromManagedMem(externalBufferMemoryInMb)
+
+ val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
lInput,
rInput,
getOperatorName,
op,
BaseRowTypeInfo.of(outputType),
getResource.getParallelism)
+ ret.setResources(resourceSpec, resourceSpec)
+ ret
}
private def getOperatorName: String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index f4dee1c..16c0c4d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -35,7 +35,7 @@ import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistri
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.physical.batch.OverWindowMode.OverWindowMode
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.rules.physical.batch.BatchExecJoinRuleBase
import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.OverAggregateUtil.getLongBoundary
@@ -378,6 +378,7 @@ class BatchExecOverAggregate(
collation.map(_._1),
collation.map(_._2))
+ var managedMemoryInMB: Int = 0
val operator = if (!needBufferData) {
//operator needn't cache data
val aggHandlers = modeToGroupToAggCallToAggFunction.map { case (_, _, aggCallToAggFunction) =>
@@ -408,15 +409,19 @@ class BatchExecOverAggregate(
new NonBufferOverWindowOperator(aggHandlers, genComparator, resetAccumulators)
} else {
val windowFrames = createOverWindowFrames(config)
+ managedMemoryInMB = config.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM)
new BufferDataOverWindowOperator(
- config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB,
+ managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB,
windowFrames,
genComparator,
inputType.getChildren.forall(t => BinaryRow.isInFixedLengthPart(t)))
}
- new OneInputTransformation(
+ val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+ val ret = new OneInputTransformation(
input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), getResource.getParallelism)
+ ret.setResources(resource, resource)
+ ret
}
def createOverWindowFrames(config: TableConfig): Array[OverWindowFrame] = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 19aa60e..9ce1d4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.calcite.Sink
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.planner.BatchPlanner
import org.apache.flink.table.sinks.{DataStreamTableSink, RetractStreamTableSink, StreamTableSink, TableSink, UpsertStreamTableSink}
import org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType}
@@ -98,7 +98,7 @@ class BatchExecSink[T](
}
val sinkTransformation = dsSink.getTransformation
- val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+ val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
planner.getTableConfig.getConfiguration)
val maxSinkParallelism = sinkTransformation.getMaxParallelism
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
index 00c1fb6..7662afc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.codegen.sort.SortCodeGenerator
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RelExplainUtil, SortUtil}
import org.apache.flink.table.planner.BatchPlanner
import org.apache.flink.table.runtime.sort.SortOperator
@@ -116,19 +116,23 @@ class BatchExecSort(
val keyTypes = keys.map(inputType.getTypeAt)
val codeGen = new SortCodeGenerator(conf, keys, keyTypes, orders, nullsIsLast)
- val reservedMemorySize = conf.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+ val managedMemoryInMB = conf.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM)
+ val managedMemory = managedMemoryInMB * NodeResourceUtil.SIZE_IN_MB
val operator = new SortOperator(
- reservedMemorySize,
+ managedMemory,
codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
codeGen.generateRecordComparator("BatchExecSortComparator"))
- new OneInputTransformation(
+ val ret = new OneInputTransformation(
input,
s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
BaseRowTypeInfo.of(outputType),
getResource.getParallelism)
+ val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+ ret.setResources(resource, resource)
+ ret
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index 1d9ba48..bb04dd4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.ExpressionFormat
import org.apache.flink.table.plan.nodes.exec.ExecNode
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, JoinUtil, SortUtil}
import org.apache.flink.table.planner.BatchPlanner
import org.apache.flink.table.runtime.join.{FlinkJoinType, SortMergeJoinOperator}
@@ -219,11 +219,13 @@ class BatchExecSortMergeJoin(
leftType,
rightType)
- val externalBufferMemory = config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+ val externalBufferMemoryInMB = config.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM)
+ val externalBufferMemory = externalBufferMemoryInMB * NodeResourceUtil.SIZE_IN_MB
- val sortMemory = config.getConfiguration.getInteger(
- ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * NodeResourceConfig.SIZE_IN_MB
+ val sortMemoryInMB = config.getConfiguration.getInteger(
+ ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM)
+ val sortMemory = sortMemoryInMB * NodeResourceUtil.SIZE_IN_MB
def newSortGen(originalKeys: Array[Int], t: RowType): SortCodeGenerator = {
val originalOrders = originalKeys.map(_ => true)
@@ -256,13 +258,18 @@ class BatchExecSortMergeJoin(
newSortGen(leftAllKey.indices.toArray, keyType).generateRecordComparator("KeyComparator"),
filterNulls)
- new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+ val externalBufferNum = if (flinkJoinType == FlinkJoinType.FULL) 2 else 1
+ val managedMemoryInMB = externalBufferMemoryInMB * externalBufferNum + sortMemoryInMB * 2
+ val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
leftInput,
rightInput,
getOperatorName,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
getResource.getParallelism)
+ val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
+ ret.setResources(resource, resource)
+ ret
}
private def estimateOutputSize(relNode: RelNode): Double = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
index 768b028..8a01d69 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef}
import org.apache.flink.table.plan.nodes.calcite.Sink
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.nodes.resource.NodeResourceConfig
+import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil
import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.sinks._
@@ -124,7 +124,7 @@ class StreamExecSink[T](
"implemented and return the sink transformation DataStreamSink. " +
s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.")
}
- val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+ val configSinkParallelism = NodeResourceUtil.getSinkParallelism(
planner.getTableConfig.getConfiguration)
val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
index 2106d0b..1b6109e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
@@ -18,9 +18,9 @@
package org.apache.flink.table.plan.reuse
-import org.apache.flink.runtime.io.network.DataExchangeMode
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.ShuffleMode
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode, ExecNodeVisitorImpl}
@@ -70,7 +70,7 @@ import scala.collection.mutable
* |
* HashJoin
* (build side)/ \(probe side)
- * (broadcast)Exchange Exchange(exchange_mode=[BATCH]) add BATCH Exchange to breakup deadlock
+ * (broadcast)Exchange Exchange(shuffle_mode=[BATCH]) add BATCH Exchange to breakup deadlock
* | |
* Calc(b>10) Calc(b<20)
* \ /
@@ -165,7 +165,7 @@ class DeadlockBreakupProcessor extends DAGProcessor {
probeNode match {
case e: BatchExecExchange =>
// TODO create a cloned BatchExecExchange for PIPELINE output
- e.setRequiredDataExchangeMode(DataExchangeMode.BATCH)
+ e.setRequiredShuffleMode(ShuffleMode.BATCH)
case _ =>
val probeRel = probeNode.asInstanceOf[RelNode]
val traitSet = probeRel.getTraitSet.replace(distribution)
@@ -174,7 +174,7 @@ class DeadlockBreakupProcessor extends DAGProcessor {
traitSet,
probeRel,
distribution)
- e.setRequiredDataExchangeMode(DataExchangeMode.BATCH)
+ e.setRequiredShuffleMode(ShuffleMode.BATCH)
// replace join node's input
join.replaceInputNode(probeSideIndex, e)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
index dfb1ddf..9b4e4f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
@@ -89,7 +89,9 @@ class BatchPlanner(
val optimizedRelNodes = optimize(sinkRelNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
val transformations = translateToPlan(execNodes)
- val streamGraph = BatchExecutor.generateStreamGraph(getExecEnv, transformations, "")
+ val batchExecutor = new BatchExecutor(getExecEnv)
+ batchExecutor.setTableConfig(getTableConfig)
+ val streamGraph = batchExecutor.generateStreamGraph(transformations, "")
val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
val sb = new StringBuilder
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index 3ca99ac..ef17933 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -70,6 +70,8 @@ abstract class PlannerBase(
catalogManager: CatalogManager)
extends Planner {
+ executor.asInstanceOf[ExecutorBase].setTableConfig(config)
+
private val plannerContext: PlannerContext =
new PlannerContext(
config,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 52651e6..3007d92 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.executor.BatchExecutor
+import org.apache.flink.table.executor.StreamExecutor
import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
import org.apache.flink.table.plan.`trait`._
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
@@ -87,7 +87,9 @@ class StreamPlanner(
val optimizedRelNodes = optimize(sinkRelNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
val transformations = translateToPlan(execNodes)
- val streamGraph = BatchExecutor.generateStreamGraph(getExecEnv, transformations, "")
+ val streamExecutor = new StreamExecutor(getExecEnv)
+ streamExecutor.setTableConfig(getTableConfig)
+ val streamGraph = streamExecutor.generateStreamGraph(transformations, "")
val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
val sb = new StringBuilder
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/catalog/PathResolutionTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/catalog/PathResolutionTest.xml
deleted file mode 100644
index 4de35be..0000000
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/catalog/PathResolutionTest.xml
+++ /dev/null
@@ -1,195 +0,0 @@
-<?xml version="1.0" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<Root>
- <TestCase name="testStreamSqlPathResolution[0: simpleInDefaultPath=[sqlPath: tab1, tableApiPath: [tab1], expectedPath: [builtin, default, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[builtin, default, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[10: spaceInNames=[sqlPath: `default db`.`tab 1`, tableApiPath: [default db, tab 1], expectedPath: [builtin, default db, tab 1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM `default db`.`tab 1`]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default db, tab 1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[builtin, default db, tab 1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[1: simpleInChangedDefaultCatalog=[defaultCatalog: cat1, sqlPath: tab1, tableApiPath: [tab1], expectedPath: [cat1, db1, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[cat1, db1, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[cat1, db1, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[2: simpleInChangedDefaultPath=[defaultCatalog: cat1, defaultDatabase: db2, sqlPath: tab1, tableApiPath: [tab1], expectedPath: [cat1, db2, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[cat1, db2, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[cat1, db2, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[3: qualifiedWithDatabase=[defaultCatalog: builtin, defaultDatabase: default, sqlPath: db1.tab1, tableApiPath: [db1, tab1], expectedPath: [builtin, db1, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM db1.tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, db1, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[builtin, db1, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[4: fullyQualifiedName=[defaultCatalog: builtin, defaultDatabase: default, sqlPath: cat1.db1.tab1, tableApiPath: [cat1, db1, tab1], expectedPath: [cat1, db1, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM cat1.db1.tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[cat1, db1, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[cat1, db1, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[5: externalCatalogTopLevelTable=[sqlPath: extCat1.tab1, tableApiPath: [extCat1, tab1], expectedPath: [extCat1, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM extCat1.tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[extCat1, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[extCat1, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[6: externalCatalogMultiLevelNesting=[sqlPath: extCat1.extCat2.extCat3.tab1, tableApiPath: [extCat1, extCat2, extCat3, tab1], expectedPath: [extCat1, extCat2, extCat3, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM extCat1.extCat2.extCat3.tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[extCat1, extCat2, extCat3, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[extCat1, extCat2, extCat3, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[7: dotInUnqualifiedTableName=[sqlPath: `tab.1`, tableApiPath: [tab.1], expectedPath: [builtin, default, tab.1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM `tab.1`]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default, tab.1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[builtin, default, tab.1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[8: dotInDatabaseName=[sqlPath: `default.db`.tab1, tableApiPath: [default.db, tab1], expectedPath: [builtin, default.db, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM `default.db`.tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default.db, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[builtin, default.db, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStreamSqlPathResolution[9: dotInDefaultDatabaseName=[defaultCatalog: builtin, defaultDatabase: default.db, sqlPath: tab1, tableApiPath: [tab1], expectedPath: [builtin, default.db, tab1]]]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM tab1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject
-+- LogicalTableScan(table=[[builtin, default.db, tab1, source: [()]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-TableSourceScan(table=[[builtin, default.db, tab1, source: [()]]], fields=[])
-]]>
- </Resource>
- </TestCase>
-</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/executor/BatchExecutorTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/executor/BatchExecutorTest.xml
new file mode 100644
index 0000000..52a0f64
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/executor/BatchExecutorTest.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testRestoreConfigWhenBatchShuffleMode">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+ ship_strategy : FORWARD
+
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRestoreConfig">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+ ship_strategy : FORWARD
+
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml
index 5b83079..acd9bb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DagOptimizationTest.xml
@@ -285,7 +285,7 @@ Calc(select=[a AS a1, b, c AS c2], reuse_id=[3])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
- +- Exchange(distribution=[hash[a2]], exchange_mode=[BATCH])
+ +- Exchange(distribution=[hash[a2]], shuffle_mode=[BATCH])
+- Calc(select=[a AS a2, c], where=[>=(b, 5)])
+- Reused(reference_id=[2])
@@ -298,7 +298,7 @@ Sink(name=[sink2], fields=[a1, b, c1])
:- Exchange(distribution=[hash[a3]])
: +- Calc(select=[a AS a3, c AS c1], where=[<(b, 5)])
: +- Reused(reference_id=[2])
- +- Exchange(distribution=[hash[a1]], exchange_mode=[BATCH])
+ +- Exchange(distribution=[hash[a1]])
+- Calc(select=[a1, b])
+- Reused(reference_id=[3])
]]>
@@ -737,12 +737,12 @@ LogicalSink(name=[appendSink], fields=[a1, b, c1])
Sink(name=[appendSink], fields=[a1, b, c1])
+- Calc(select=[a1, b, c1])
+- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a3, c1, a1, b], build=[right])
- :- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a3]], shuffle_mode=[BATCH])
: +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Calc(select=[a AS a1, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a2]])
@@ -822,17 +822,17 @@ LogicalSink(name=[sink], fields=[a, b, c])
Sink(name=[sink], fields=[a, b, c])
+- Calc(select=[a AS a1, b1, c1])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a3)], select=[a, a3, b1, c1], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a], where=[<=(a, 10)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Calc(select=[a3, b AS b1, c1])
+- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a3, c1, a1, b], build=[right])
- :- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a3]], shuffle_mode=[BATCH])
: +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
: +- Reused(reference_id=[1])
+- Calc(select=[a AS a1, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a2]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml
index 1c4a26d..1b04b86 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.xml
@@ -35,7 +35,7 @@ HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
+- HashJoin(joinType=[LeftSemiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c], build=[left])
:- Exchange(distribution=[hash[a, b, c]])
: +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
- +- Exchange(distribution=[hash[a, b, c]], exchange_mode=[BATCH])
+ +- Exchange(distribution=[hash[a, b, c]], shuffle_mode=[BATCH])
+- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
]]>
</Resource>
@@ -57,7 +57,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
<![CDATA[
Calc(select=[a, b, c, a1, b0, c0])
+- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, c, a0, a1, b0, c0], build=[right])
- :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, c, CAST(a) AS a0])
: +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+- Exchange(distribution=[hash[b]])
@@ -81,7 +81,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, c], where=[>(b, 10)])
: +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+- Exchange(distribution=[hash[a]])
@@ -114,7 +114,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
<![CDATA[
Calc(select=[c, a, b, c0, a1, b0])
+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
- :- Exchange(distribution=[any], exchange_mode=[BATCH])
+ :- Exchange(distribution=[any], shuffle_mode=[BATCH])
: +- Calc(select=[c, a, b, CAST(a) AS a0])
: +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
: +- Exchange(distribution=[hash[c]])
@@ -161,7 +161,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5], a1=[$6], b1=[$
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(c, c1)], select=[a, b, c, a0, b0, c0, a1, b1, c1, a00, b00, c00], build=[right])
-:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[c]])
: +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
: :- Calc(select=[a, b, c], where=[>(b, 10)])
: : +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b, Final_MAX(max$1) AS c], reuse_id=[1])
@@ -170,13 +170,13 @@ HashJoin(joinType=[InnerJoin], where=[=(c, c1)], select=[a, b, c, a0, b0, c0, a1
: : +- LocalSortAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0, Partial_MAX(c) AS max$1])
: : +- Sort(orderBy=[a ASC])
: : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-: +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+: +- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[c]])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
:- Calc(select=[a, b, c], where=[<(b, 5)])
: +- Reused(reference_id=[1])
- +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ +- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
+- Reused(reference_id=[1])
]]>
</Resource>
@@ -211,7 +211,7 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0, b], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a], where=[=(b, 5:BIGINT)])
: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- HashJoin(joinType=[LeftSemiJoin], where=[=(a, a0)], select=[a, b], build=[left])
@@ -221,7 +221,7 @@ Calc(select=[a, b])
: +- Exchange(distribution=[single])
: +- Limit(offset=[0], fetch=[10], global=[false])
: +- Reused(reference_id=[1])
- +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ +- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
+- Calc(select=[a], where=[>(b, 5)])
+- Reused(reference_id=[1])
]]>
@@ -251,7 +251,7 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a], reuse_id=[1])
: +- Limit(offset=[0], fetch=[10], global=[true])
: +- Exchange(distribution=[single])
@@ -292,7 +292,7 @@ Calc(select=[a])
: +- Exchange(distribution=[single])
: +- Limit(offset=[0], fetch=[10], global=[false])
: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[any], exchange_mode=[BATCH])
+ +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+- Reused(reference_id=[1])
]]>
</Resource>
@@ -324,7 +324,7 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3], a0=[$4], b0=[$5], d0=[$6], e0=[$7
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, d, e, a0, b0, d0, e0], build=[right])
-:- Exchange(distribution=[hash[b]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
: +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b], where=[<(a, 10)])
@@ -367,7 +367,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(c, c0)], select=[a, b, c, a0, b0, c0], build=[right])
-:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
: +- Calc(select=[w0$o0 AS a, b, c])
: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX($2) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[b, c, $2, w0$o0])
: +- Calc(select=[b, c, CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS $2], reuse_id=[1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
index c514ad8..5a80315 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -227,7 +227,7 @@ LogicalMinus(all=[false])
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
+- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c], build=[left])
- :- Exchange(distribution=[hash[a, b, c]], exchange_mode=[BATCH], reuse_id=[1])
+ :- Exchange(distribution=[hash[a, b, c]], shuffle_mode=[BATCH], reuse_id=[1])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Reused(reference_id=[1])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index 141cbdf..9629fb3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -40,7 +40,7 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a], reuse_id=[1])
: +- Limit(offset=[0], fetch=[10], global=[true])
: +- Exchange(distribution=[single])
@@ -81,7 +81,7 @@ Calc(select=[a])
: +- Exchange(distribution=[single])
: +- Limit(offset=[0], fetch=[10], global=[false])
: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[any], exchange_mode=[BATCH])
+ +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+- Reused(reference_id=[1])
]]>
</Resource>
@@ -157,7 +157,7 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3], a0=[$4], b0=[$5], d0=[$6], e0=[$7
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, d, e, a0, b0, d0, e0], build=[right])
-:- Exchange(distribution=[hash[b]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
: +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b], where=[<(a, 10)])
@@ -257,10 +257,10 @@ Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS nsum, avg_b0 AS avg_b
: : +- Exchange(distribution=[hash[d]])
: : +- Calc(select=[d, e], where=[>(e, 10)])
: : +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
- : +- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
+ : +- Exchange(distribution=[hash[c, e, $f5]], shuffle_mode=[BATCH])
: +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, c, e, +(w1$o0, 1) AS $f5])
: +- Reused(reference_id=[1])
- +- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
+ +- Exchange(distribution=[hash[c, e, $f5]], shuffle_mode=[BATCH])
+- Calc(select=[sum_b, c, e, -(w0$o0, 1) AS $f5])
+- OverAggregate(partitionBy=[c, e], orderBy=[c ASC, e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0])
+- Reused(reference_id=[2])
@@ -291,7 +291,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
<![CDATA[
Calc(select=[c, a, b, c0, a1, b0])
+- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
- :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
: +- Calc(select=[c, a, b, CAST(a) AS a0])
: +- SortAggregate(isMerge=[false], groupBy=[c], select=[c, MyFirst(a) AS a, MyLast(b) AS b], reuse_id=[1])
: +- Sort(orderBy=[c ASC])
@@ -492,7 +492,7 @@ LogicalProject(a=[$0], b=[$1], a0=[$2])
<![CDATA[
Calc(select=[a, b, a1 AS a0])
+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, a0, a1, b0], build=[right])
- :- Exchange(distribution=[any], exchange_mode=[BATCH])
+ :- Exchange(distribution=[any], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, CAST(a) AS a0])
: +- Limit(offset=[0], fetch=[10], global=[true], reuse_id=[1])
: +- Exchange(distribution=[single])
@@ -603,7 +603,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
<![CDATA[
Calc(select=[c, a, b, c0, a1, b0])
+- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
- :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
: +- Calc(select=[c, a, b, CAST(a) AS a0])
: +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
: +- Exchange(distribution=[hash[c]])
@@ -642,7 +642,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f, a0, b0, c0, d0, e0, f0], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left], reuse_id=[1])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
@@ -682,7 +682,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
<![CDATA[
Calc(select=[a, b, c, d, e, f, a1, b0, c0, d0, e0, f0])
+- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, c, d, e, f, a0, a1, b0, c0, d0, e0, f0], build=[right])
- :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, c, d, e, f, CAST(a) AS a0])
: +- NestedLoopJoin(joinType=[InnerJoin], where=[OR(=(ABS(a), ABS(d)), =(c, f))], select=[a, b, c, d, e, f], build=[left], reuse_id=[1])
: :- Exchange(distribution=[broadcast])
@@ -724,7 +724,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
<![CDATA[
Calc(select=[a, b, c, d, e, f, a1, b0, c0, d0, e0, f0])
+- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, c, d, e, f, a0, a1, b0, c0, d0, e0, f0], build=[right])
- :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, c, d, e, f, CAST(a) AS a0])
: +- NestedLoopJoin(joinType=[InnerJoin], where=[OR(=(random_udf(a), random_udf(d)), =(c, f))], select=[a, b, c, d, e, f], build=[left], reuse_id=[1])
: :- Exchange(distribution=[broadcast])
@@ -761,7 +761,7 @@ LogicalProject(a=[$0], b=[$1], a0=[$2])
<![CDATA[
Calc(select=[a, b, a1 AS a0])
+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[a, b, a0, a1, b0], build=[right])
- :- Exchange(distribution=[any], exchange_mode=[BATCH])
+ :- Exchange(distribution=[any], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, CAST(a) AS a0])
: +- Limit(offset=[0], fetch=[10], global=[true], reuse_id=[1])
: +- Exchange(distribution=[single])
@@ -806,7 +806,7 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f,
: : +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
: : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
-+- Exchange(distribution=[any], exchange_mode=[BATCH])
++- Exchange(distribution=[any], shuffle_mode=[BATCH])
+- Reused(reference_id=[1])
]]>
</Resource>
@@ -832,7 +832,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, $2, a0, b0, $20], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, w0$o0 AS $2], where=[<(b, 100)])
: +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window#0=[MyFirst(c) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0], reuse_id=[1])
: +- Sort(orderBy=[c DESC])
@@ -867,7 +867,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[c, a, b, c0, a0, b0], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[c, a, b], where=[>(a, 1)])
: +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
: +- Exchange(distribution=[hash[c]])
@@ -903,7 +903,7 @@ LogicalProject(c=[$0], a=[$1], b=[$2], c0=[$3], a0=[$4], b0=[$5])
<![CDATA[
Calc(select=[c, a, b, c0, a1, b0])
+- HashJoin(joinType=[InnerJoin], where=[=(a0, b0)], select=[c, a, b, a0, c0, a1, b0], build=[right])
- :- Exchange(distribution=[hash[a0]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a0]], shuffle_mode=[BATCH])
: +- Calc(select=[c, a, b, CAST(a) AS a0])
: +- SortAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS a, Final_SUM(sum$1) AS b], reuse_id=[1])
: +- Sort(orderBy=[c ASC])
@@ -938,7 +938,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, $2, a0, b0, $20], build=[right])
-:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Calc(select=[a, b, w0$o0 AS $2], where=[<(b, 100)])
: +- OverAggregate(orderBy=[c DESC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0], reuse_id=[1])
: +- Sort(orderBy=[c DESC])
@@ -984,7 +984,7 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[c, a, b, c0, a0,
: +- Exchange(distribution=[hash[c]])
: +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS sum$0, Partial_SUM(b) AS sum$1])
: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[any], exchange_mode=[BATCH])
++- Exchange(distribution=[any], shuffle_mode=[BATCH])
+- Calc(select=[c, a, b], where=[<(b, 10)])
+- Reused(reference_id=[1])
]]>
@@ -1022,7 +1022,7 @@ LogicalProject(a=[$0], c=[$1], c0=[$3])
<![CDATA[
Calc(select=[a, c, c0])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, a0, c0], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH], reuse_id=[1])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH], reuse_id=[1])
: +- Union(all=[true], union=[a, c])
: :- Calc(select=[a, c], where=[>(b, 10)])
: : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -1095,7 +1095,7 @@ NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, rand
: +- Sort(orderBy=[random ASC])
: +- Exchange(distribution=[hash[random]])
: +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
-: :- Exchange(distribution=[any], exchange_mode=[BATCH])
+: :- Exchange(distribution=[any], shuffle_mode=[BATCH])
: : +- Calc(select=[random], reuse_id=[1])
: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
: : +- Exchange(distribution=[single])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index 8eb5180..202533c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -327,7 +327,7 @@ Calc(select=[EXPR$0, EXPR$1, EXPR$2])
+- Exchange(distribution=[hash[b, b0]])
+- LocalHashAggregate(groupBy=[b, b0], select=[b, b0, Partial_COUNT(a) AS count$0, Partial_COUNT(id) AS count$1, Partial_SUM(a0) AS sum$2])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[b, a, id, a0, b0], build=[right])
- :- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+ :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[a=id], where=[>(age, 10)], select=[b, a, id], reuse_id=[1])
: +- Calc(select=[b, a])
: +- HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml
index 4cc5310..a528c3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SingleRowJoinTest.xml
@@ -35,7 +35,7 @@ LogicalProject(a1=[$0], a2=[$1])
<![CDATA[
Calc(select=[a1, a2])
+- NestedLoopJoin(joinType=[InnerJoin], where=[<(a1, cnt)], select=[a1, a2, cnt], build=[right], singleRowJoin=[true])
- :- Exchange(distribution=[any], exchange_mode=[BATCH])
+ :- Exchange(distribution=[any], shuffle_mode=[BATCH])
: +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1, a2], reuse_id=[1])
+- Exchange(distribution=[broadcast])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS cnt])
@@ -147,7 +147,7 @@ LogicalProject(a1=[$0], a_sum=[$2])
<Resource name="planAfter">
<![CDATA[
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, a_sum], build=[right], singleRowJoin=[true])
-:- Exchange(distribution=[any], exchange_mode=[BATCH])
+:- Exchange(distribution=[any], shuffle_mode=[BATCH])
: +- Calc(select=[a1])
: +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1, a2], reuse_id=[1])
+- Exchange(distribution=[broadcast])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml
index ee628f4..0d88364 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/SetOperatorsTest.xml
@@ -87,7 +87,7 @@ LogicalProject(a1=[AS($0, _UTF-16LE'a1')])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[LeftSemiJoin], where=[=(c, a1)], select=[a, b, c], build=[right], tryDistinctBuildRow=[true])
-:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
+:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
: +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Exchange(distribution=[hash[a1]])
+- LocalHashAggregate(groupBy=[a1], select=[a1])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
new file mode 100644
index 0000000..cca8e1a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.executor
+
+import org.apache.flink.api.common.InputDependencyConstraint
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.transformations.ShuffleMode
+import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
+
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+/**
+ * Test for streamEnv config save and restore when run batch jobs.
+ */
+class BatchExecutorTest extends TableTestBase {
+
+ private var util: BatchTableTestUtil = _
+
+ @Before
+ def setUp(): Unit = {
+ util = batchTestUtil()
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+ util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testRestoreConfig(): Unit = {
+ util.getStreamEnv.setBufferTimeout(11)
+ util.getStreamEnv.getConfig.disableObjectReuse()
+ util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
+ util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
+ util.verifyExplain("SELECT * FROM MyTable")
+ assertEquals(11, util.getStreamEnv.getBufferTimeout)
+ assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
+ assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
+ assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
+ assertEquals(InputDependencyConstraint.ANY,
+ util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
+ }
+
+ @Test
+ def testRestoreConfigWhenBatchShuffleMode(): Unit = {
+ util.getTableEnv.getConfig.getConfiguration.setString(
+ ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
+ ShuffleMode.BATCH.toString)
+ util.getStreamEnv.setBufferTimeout(11)
+ util.getStreamEnv.getConfig.disableObjectReuse()
+ util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
+ util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
+ util.verifyExplain("SELECT * FROM MyTable")
+ assertEquals(11, util.getStreamEnv.getBufferTimeout)
+ assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
+ assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
+ assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
+ assertEquals(InputDependencyConstraint.ANY,
+ util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index fd2e489..3e8d304 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.transformations.ShuffleMode
import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
import org.apache.flink.table.api.{EnvironmentSettings, ExecutionConfigOptions, SqlParserException, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter}
@@ -72,6 +73,8 @@ class BatchTestBase extends BatchAbstractTestBase {
conf.getConfiguration.setInteger(ExecutionConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM, 2)
conf.getConfiguration.setInteger(ExecutionConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM, 1)
conf.getConfiguration.setInteger(ExecutionConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM, 1)
+ conf.getConfiguration.setString(ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
+ ShuffleMode.PIPELINED.toString)
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index fb9dfd5..26a5268 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
+import org.apache.flink.streaming.api.transformations.ShuffleMode
import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
import org.apache.flink.table.api._
import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
@@ -483,12 +484,16 @@ abstract class TableTestUtil(
protected val testingTableEnv: TestingTableEnvironment =
TestingTableEnvironment.create(setting, catalogManager)
val tableEnv: TableEnvironment = testingTableEnv
+ tableEnv.getConfig.getConfiguration.setString(
+ ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString)
private val env: StreamExecutionEnvironment = getPlanner.getExecEnv
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
override def getTableEnv: TableEnvironment = tableEnv
+ def getStreamEnv: StreamExecutionEnvironment = env
+
/**
* Create a [[TestTableSource]] with the given schema, table stats and unique keys,
* and registers this TableSource under given name into the TableEnvironment's catalog.
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
index 6c8cce3..283625c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
@@ -107,12 +107,12 @@ public class ExecutionConfigOptions {
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_AGG_TABLE_MEM =
key("sql.resource.hash-agg.table.memory.mb")
- .defaultValue(256)
+ .defaultValue(128)
.withDescription("Sets the table memory size of hashAgg operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_JOIN_TABLE_MEM =
key("sql.resource.hash-join.table.memory.mb")
- .defaultValue(512)
+ .defaultValue(128)
.withDescription("Sets the HashTable reserved memory for hashJoin operator. It defines the lower limit.");
public static final ConfigOption<Integer> SQL_RESOURCE_SORT_BUFFER_MEM =
@@ -195,4 +195,12 @@ public class ExecutionConfigOptions {
"means a kind of disabled operator. Its default value is empty that means no operators are disabled. " +
"If the configure's value is \"NestedLoopJoin, ShuffleHashJoin\", NestedLoopJoin and ShuffleHashJoin " +
"are disabled. If configure's value is \"HashJoin\", ShuffleHashJoin and BroadcastHashJoin are disabled.");
+
+ public static final ConfigOption<String> SQL_EXEC_SHUFFLE_MODE =
+ key("sql.exec.shuffle-mode")
+ .defaultValue("batch")
+ .withDescription("Sets exec shuffle mode. Only batch or pipeline can be set.\n" +
+ "batch: the job will run stage by stage. \n" +
+ "pipeline: the job will run in streaming mode, but it may cause resource deadlock that receiver waits for resource to start when " +
+ "the sender holds resource to wait to send data to the receiver.");
}