You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/04 13:02:49 UTC
[flink] branch master updated: [FLINK-12665][table-planner-blink]
Introduce MiniBatchIntervalInferRule and watermark assigner operators
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4fbf323 [FLINK-12665][table-planner-blink] Introduce MiniBatchIntervalInferRule and watermark assigner operators
4fbf323 is described below
commit 4fbf32356a1df34b9ed5ce0c484dfc9695cd7c4f
Author: godfrey he <go...@163.com>
AuthorDate: Thu Jul 4 21:02:37 2019 +0800
[FLINK-12665][table-planner-blink] Introduce MiniBatchIntervalInferRule and watermark assigner operators
This closes #8562
---
.../util/AbstractStreamOperatorTestHarness.java | 6 +
.../stream/StreamExecWatermarkAssigner.scala | 93 +++-
.../flink/table/plan/optimize/RelNodeBlock.scala | 9 +
.../StreamCommonSubGraphBasedOptimizer.scala | 84 ++-
...> FlinkMiniBatchIntervalTraitInitProgram.scala} | 31 +-
.../plan/optimize/program/FlinkStreamProgram.scala | 8 +
.../optimize/program/StreamOptimizeContext.scala | 7 +
.../table/plan/rules/FlinkStreamRuleSets.scala | 9 +
.../stream/MiniBatchIntervalInferRule.scala | 129 +++++
.../stream/StreamExecWatermarkAssignerRule.scala | 64 +++
.../table/plan/trait/MiniBatchIntervalTrait.scala | 11 +-
.../flink/table/plan/util/FlinkRelOptUtil.scala | 70 +++
.../apache/flink/table/api/stream/ExplainTest.xml | 256 ++++++++-
.../stream/RetractionRulesWithTwoStageAggTest.xml | 12 +-
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 572 +++++++++++++++++++++
.../plan/stream/sql/ModifiedMonotonicityTest.xml | 9 +-
.../table/plan/stream/sql/agg/AggregateTest.xml | 18 +-
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 204 +++++---
.../stream/sql/agg/IncrementalAggregateTest.xml | 51 +-
.../plan/stream/sql/agg/TwoStageAggregateTest.xml | 21 +-
.../flink/table/api/stream/ExplainTest.scala | 43 +-
.../stream/sql/MiniBatchIntervalInferTest.scala | 331 ++++++++++++
.../table/plan/stream/sql/agg/AggregateTest.scala | 2 -
.../table/plan/util/FlinkRelOptUtilTest.scala | 64 +++
.../table/runtime/stream/sql/AggregateITCase.scala | 35 ++
.../table/runtime/utils/StreamingTestBase.scala | 24 +
.../apache/flink/table/util/TableTestBase.scala | 32 +-
.../apache/flink/table/api/TableConfigOptions.java | 8 +
.../MiniBatchAssignerOperator.java | 105 ++++
.../MiniBatchedWatermarkAssignerOperator.java | 176 +++++++
.../WatermarkAssignerOperator.java | 177 +++++++
.../MiniBatchAssignerOperatorTest.java | 125 +++++
.../MiniBatchedWatermarkAssignerOperatorTest.java | 127 +++++
.../WatermarkAssignerOperatorTest.java | 165 ++++++
.../WatermarkAssignerOperatorTestBase.java | 64 +++
35 files changed, 2956 insertions(+), 186 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ce561d4..fc24956 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -566,6 +567,11 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
}
}
+ @VisibleForTesting
+ public StreamStatus getStreamStatus() {
+ return mockTask.getStreamStatusMaintainer().getStreamStatus();
+ }
+
private class MockOutput implements Output<StreamRecord<OUT>> {
private TypeSerializer<OUT> outputSerializer;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
index b7b47d8..4847d03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
@@ -18,21 +18,27 @@
package org.apache.flink.table.plan.nodes.physical.stream
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions, TableException}
+import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{MiniBatchIntervalTraitDef, MiniBatchMode}
import org.apache.flink.table.plan.nodes.calcite.WatermarkAssigner
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.optimize.program.FlinkOptimizeContext
+import org.apache.flink.table.runtime.watermarkassigner.{MiniBatchAssignerOperator, MiniBatchedWatermarkAssignerOperator, WatermarkAssignerOperator}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.flink.util.Preconditions
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
-import java.util
-import org.apache.flink.api.dag.Transformation
+import java.util
+import java.util.Calendar
import scala.collection.JavaConversions._
+
/**
* Stream physical RelNode for [[WatermarkAssigner]].
*/
@@ -68,22 +74,23 @@ class StreamExecWatermarkAssigner(
override def explainTerms(pw: RelWriter): RelWriter = {
val miniBatchInterval = traits.getTrait(MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval
- val value = miniBatchInterval.mode match {
- case MiniBatchMode.None =>
- // 1. operator requiring watermark, but minibatch is not enabled
- // 2. redundant watermark definition in DDL
- // 3. existing window, and window minibatch is disabled.
- "None"
- case MiniBatchMode.ProcTime =>
- val config = cluster.getPlanner.getContext.asInstanceOf[FlinkOptimizeContext].getTableConfig
- val miniBatchLatency = config.getConf.getLong(
- TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
- Preconditions.checkArgument(miniBatchLatency > 0,
- "MiniBatch latency must be greater that 0.", null)
- s"Proctime, ${miniBatchLatency}ms"
- case MiniBatchMode.RowTime =>
- s"Rowtime, ${miniBatchInterval.interval}ms"
- case o => throw new TableException(s"Unsupported mode: $o")
+ val value = if (miniBatchInterval.mode == MiniBatchMode.None ||
+ miniBatchInterval.interval == 0) {
+ // 1. redundant watermark definition in DDL
+ // 2. existing window aggregate
+ // 3. operator requiring watermark, but minibatch is not enabled
+ "None"
+ } else if (miniBatchInterval.mode == MiniBatchMode.ProcTime) {
+ val tableConfig = cluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig
+ val miniBatchLatency = tableConfig.getConf.getLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
+ Preconditions.checkArgument(miniBatchLatency > 0,
+ "MiniBatch latency must be greater that 0.", null)
+ s"Proctime, ${miniBatchLatency}ms"
+ } else if (miniBatchInterval.mode == MiniBatchMode.RowTime) {
+ s"Rowtime, ${miniBatchInterval.interval}ms"
+ } else {
+ throw new TableException(s"Unsupported mode: $miniBatchInterval")
}
super.explainTerms(pw).item("miniBatchInterval", value)
}
@@ -102,7 +109,53 @@ class StreamExecWatermarkAssigner(
override protected def translateToPlanInternal(
tableEnv: StreamTableEnvironment): Transformation[BaseRow] = {
- throw new TableException("Implements this")
+ val inputTransformation = getInputNodes.get(0).translateToPlan(tableEnv)
+ .asInstanceOf[Transformation[BaseRow]]
+
+ val inferredInterval = getTraitSet.getTrait(
+ MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval
+ val idleTimeout = tableEnv.getConfig.getConf.getLong(
+ TableConfigOptions.SQL_EXEC_SOURCE_IDLE_TIMEOUT)
+
+ val (operator, opName) = if (inferredInterval.mode == MiniBatchMode.None ||
+ inferredInterval.interval == 0) {
+ require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
+ require(watermarkDelay.isDefined, "watermarkDelay should not be None")
+ // 1. redundant watermark definition in DDL
+ // 2. existing window aggregate
+ // 3. operator requiring watermark, but minibatch is not enabled
+ val op = new WatermarkAssignerOperator(rowtimeFieldIndex.get, watermarkDelay.get, idleTimeout)
+ val opName =
+ s"WatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}, offset: ${watermarkDelay.get})"
+ (op, opName)
+ } else if (inferredInterval.mode == MiniBatchMode.ProcTime) {
+ val op = new MiniBatchAssignerOperator(inferredInterval.interval)
+ val opName = s"MiniBatchAssigner(intervalMs: ${inferredInterval.interval})"
+ (op, opName)
+ } else {
+ require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
+ require(watermarkDelay.isDefined, "watermarkDelay should not be None")
+ // get the timezone offset.
+ val tzOffset = tableEnv.getConfig.getTimeZone.getOffset(Calendar.ZONE_OFFSET)
+ val op = new MiniBatchedWatermarkAssignerOperator(
+ rowtimeFieldIndex.get,
+ watermarkDelay.get,
+ tzOffset,
+ idleTimeout,
+ inferredInterval.interval)
+ val opName = s"MiniBatchedWatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}," +
+ s" offset: ${watermarkDelay.get}, intervalMs: ${inferredInterval.interval})"
+ (op, opName)
+ }
+
+ val outputRowTypeInfo = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
+ val transformation = new OneInputTransformation[BaseRow, BaseRow](
+ inputTransformation,
+ opName,
+ operator,
+ outputRowTypeInfo,
+ inputTransformation.getParallelism)
+ transformation
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala
index 29b069d..6bacb15 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.plan.optimize
import org.apache.flink.table.api.{PlannerConfigOptions, TableConfig}
+import org.apache.flink.table.plan.`trait`.MiniBatchInterval
import org.apache.flink.table.plan.nodes.calcite.Sink
import org.apache.flink.table.plan.reuse.SubplanReuser.{SubplanReuseContext, SubplanReuseShuttle}
import org.apache.flink.table.plan.rules.logical.WindowPropertiesRules
@@ -123,6 +124,8 @@ class RelNodeBlock(val outputNode: RelNode) {
private var updateAsRetract: Boolean = false
+ private var miniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE
+
def addChild(block: RelNodeBlock): Unit = childBlocks += block
def children: Seq[RelNodeBlock] = childBlocks.toSeq
@@ -148,6 +151,12 @@ class RelNodeBlock(val outputNode: RelNode) {
def isUpdateAsRetraction: Boolean = updateAsRetract
+ def setMiniBatchInterval(miniBatchInterval: MiniBatchInterval): Unit = {
+ this.miniBatchInterval = miniBatchInterval
+ }
+
+ def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval
+
def getChildBlock(node: RelNode): Option[RelNodeBlock] = {
val find = children.filter(_.outputNode.equals(node))
if (find.isEmpty) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index d97ec3e..c19bd88 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -18,15 +18,16 @@
package org.apache.flink.table.plan.optimize
-import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableConfigOptions}
import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef, UpdateAsRetractionTraitDef}
+import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef, MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, UpdateAsRetractionTraitDef}
import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.plan.nodes.calcite.Sink
import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecIntermediateTableScan, StreamPhysicalRel}
import org.apache.flink.table.plan.optimize.program.{FlinkStreamProgram, StreamOptimizeContext}
import org.apache.flink.table.plan.schema.IntermediateRelTable
import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.sinks.{DataStreamTableSink, RetractStreamTableSink}
import org.apache.flink.util.Preconditions
@@ -60,6 +61,17 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
o.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).sendsUpdatesAsRetractions
}
sinkBlock.setUpdateAsRetraction(retractionFromRoot)
+ val miniBatchInterval: MiniBatchInterval = if (tEnv.getConfig.getConf.contains(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)) {
+ val miniBatchLatency = tEnv.getConfig.getConf.getLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
+ Preconditions.checkArgument(miniBatchLatency > 0,
+ "MiniBatch Latency must be greater than 0.", null)
+ MiniBatchInterval(miniBatchLatency, MiniBatchMode.ProcTime)
+ } else {
+ MiniBatchIntervalTrait.NONE.getMiniBatchInterval
+ }
+ sinkBlock.setMiniBatchInterval(miniBatchInterval)
}
if (sinkBlocks.size == 1) {
@@ -70,16 +82,17 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
val optimizedTree = optimizeTree(
block.getPlan,
block.isUpdateAsRetraction,
+ block.getMiniBatchInterval,
isSinkBlock = true)
block.setOptimizedPlan(optimizedTree)
return sinkBlocks
}
- // infer updateAsRetraction property for all input blocks
- sinkBlocks.foreach(b => inferUpdateAsRetraction(
- b, b.isUpdateAsRetraction, isSinkBlock = true))
- // propagate updateAsRetraction to all input blocks
- sinkBlocks.foreach(propagateTraits)
+ // infer updateAsRetraction property and miniBatchInterval property for all input blocks
+ sinkBlocks.foreach(b => inferTraits(
+ b, b.isUpdateAsRetraction, b.getMiniBatchInterval, isSinkBlock = true))
+ // propagate updateAsRetraction property and miniBatchInterval property to all input blocks
+ sinkBlocks.foreach(propagateTraits(_, isSinkBlock = true))
// clear the intermediate result
sinkBlocks.foreach(resetIntermediateResult)
// optimize recursively RelNodeBlock
@@ -102,6 +115,7 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
val optimizedTree = optimizeTree(
s,
updatesAsRetraction = block.isUpdateAsRetraction,
+ miniBatchInterval = block.getMiniBatchInterval,
isSinkBlock = true)
block.setOptimizedPlan(optimizedTree)
@@ -109,6 +123,7 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
val optimizedPlan = optimizeTree(
o,
updatesAsRetraction = block.isUpdateAsRetraction,
+ miniBatchInterval = block.getMiniBatchInterval,
isSinkBlock = isSinkBlock)
val isAccRetract = optimizedPlan.getTraitSet
.getTrait(AccModeTraitDef.INSTANCE).getAccMode == AccMode.AccRetract
@@ -126,12 +141,14 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
*
* @param relNode The root node of the relational expression tree.
* @param updatesAsRetraction True if request updates as retraction messages.
+ * @param miniBatchInterval mini-batch interval of the block.
* @param isSinkBlock True if the given block is sink block.
* @return The optimized [[RelNode]] tree
*/
private def optimizeTree(
relNode: RelNode,
updatesAsRetraction: Boolean,
+ miniBatchInterval: MiniBatchInterval,
isSinkBlock: Boolean): RelNode = {
val config = tEnv.getConfig
@@ -147,29 +164,37 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
override def getRexBuilder: RexBuilder = tEnv.getRelBuilder.getRexBuilder
- override def needFinalTimeIndicatorConversion: Boolean = true
-
override def updateAsRetraction: Boolean = updatesAsRetraction
+
+ def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval
+
+ override def needFinalTimeIndicatorConversion: Boolean = true
})
}
/**
- * Infer UpdateAsRetraction property for each block.
+ * Infer UpdateAsRetraction property and MiniBatchInterval property for each block.
* NOTES: this method should not change the original RelNode tree.
*
* @param block The [[RelNodeBlock]] instance.
* @param retractionFromRoot Whether the sink need update as retraction messages.
+ * @param miniBatchInterval mini-batch interval of the block.
* @param isSinkBlock True if the given block is sink block.
*/
- private def inferUpdateAsRetraction(
+ private def inferTraits(
block: RelNodeBlock,
retractionFromRoot: Boolean,
+ miniBatchInterval: MiniBatchInterval,
isSinkBlock: Boolean): Unit = {
block.children.foreach {
child =>
if (child.getNewOutputNode.isEmpty) {
- inferUpdateAsRetraction(child, retractionFromRoot = false, isSinkBlock = false)
+ inferTraits(
+ child,
+ retractionFromRoot = false,
+ miniBatchInterval = MiniBatchInterval.NONE,
+ isSinkBlock = false)
}
}
@@ -178,12 +203,12 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
case n: Sink =>
require(isSinkBlock)
val optimizedPlan = optimizeTree(
- n, retractionFromRoot, isSinkBlock = true)
+ n, retractionFromRoot, miniBatchInterval, isSinkBlock = true)
block.setOptimizedPlan(optimizedPlan)
case o =>
val optimizedPlan = optimizeTree(
- o, retractionFromRoot, isSinkBlock = isSinkBlock)
+ o, retractionFromRoot, miniBatchInterval, isSinkBlock = isSinkBlock)
val name = createUniqueIntermediateRelTableName
val intermediateRelTable = createIntermediateRelTable(optimizedPlan, isAccRetract = false)
val newTableScan = wrapIntermediateRelTableToTableScan(intermediateRelTable, name)
@@ -194,38 +219,55 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
}
/**
- * Propagate updateAsRetraction to all input blocks.
+ * Propagate updateAsRetraction property and miniBatchInterval property to all input blocks.
*
* @param block The [[RelNodeBlock]] instance.
+ * @param isSinkBlock True if the given block is sink block.
*/
- private def propagateTraits(block: RelNodeBlock): Unit = {
+ private def propagateTraits(block: RelNodeBlock, isSinkBlock: Boolean): Unit = {
// process current block
- def shipTraits(rel: RelNode, updateAsRetraction: Boolean): Unit = {
+ def shipTraits(
+ rel: RelNode,
+ updateAsRetraction: Boolean,
+ miniBatchInterval: MiniBatchInterval): Unit = {
rel match {
case _: StreamExecDataStreamScan | _: StreamExecIntermediateTableScan =>
val scan = rel.asInstanceOf[TableScan]
val retractionTrait = scan.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
+ val miniBatchIntervalTrait = scan.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
val tableName = scan.getTable.getQualifiedName.mkString(".")
val inputBlocks = block.children.filter(b => tableName.equals(b.getOutputTableName))
Preconditions.checkArgument(inputBlocks.size <= 1)
if (inputBlocks.size == 1) {
+ val mergedInterval = if (isSinkBlock) {
+ // traits of sinkBlock have already been
+ // initialized before first round of optimization.
+ miniBatchIntervalTrait.getMiniBatchInterval
+ } else {
+ FlinkRelOptUtil.mergeMiniBatchInterval(
+ miniBatchIntervalTrait.getMiniBatchInterval, miniBatchInterval)
+ }
+ val newInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
+ inputBlocks.head.getMiniBatchInterval,mergedInterval)
+ inputBlocks.head.setMiniBatchInterval(newInterval)
+
if (retractionTrait.sendsUpdatesAsRetractions || updateAsRetraction) {
inputBlocks.head.setUpdateAsRetraction(true)
}
}
case ser: StreamPhysicalRel => ser.getInputs.foreach { e =>
if (ser.needsUpdatesAsRetraction(e) || (updateAsRetraction && !ser.consumesRetractions)) {
- shipTraits(e, updateAsRetraction = true)
+ shipTraits(e, updateAsRetraction = true, miniBatchInterval)
} else {
- shipTraits(e, updateAsRetraction = false)
+ shipTraits(e, updateAsRetraction = false, miniBatchInterval)
}
}
}
}
- shipTraits(block.getOptimizedPlan, block.isUpdateAsRetraction)
- block.children.foreach(propagateTraits)
+ shipTraits(block.getOptimizedPlan, block.isUpdateAsRetraction, block.getMiniBatchInterval)
+ block.children.foreach(propagateTraits(_, isSinkBlock = false))
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkMiniBatchIntervalTraitInitProgram.scala
similarity index 52%
copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkMiniBatchIntervalTraitInitProgram.scala
index 2e2a722..d7eb500 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkMiniBatchIntervalTraitInitProgram.scala
@@ -18,29 +18,18 @@
package org.apache.flink.table.plan.optimize.program
-import org.apache.calcite.rex.RexBuilder
+import org.apache.flink.table.plan.`trait`.MiniBatchIntervalTrait
+
+import org.apache.calcite.rel.RelNode
/**
- * A OptimizeContext allows to obtain stream table environment information when optimizing.
+ * A FlinkOptimizeProgram that does some initialization be for MiniBatch Interval inference.
*/
-trait StreamOptimizeContext extends FlinkOptimizeContext {
-
- /**
- * Gets the Calcite [[RexBuilder]] defined in [[org.apache.flink.table.api.TableEnvironment]].
- */
- def getRexBuilder: RexBuilder
-
- /**
- * Returns true if the sink requests updates as retraction messages
- * defined in
- * [[org.apache.flink.table.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimize]].
- */
- def updateAsRetraction: Boolean
-
- /**
- * Returns true if the output node needs final TimeIndicator conversion
- * defined in [[org.apache.flink.table.api.TableEnvironment.optimize]].
- */
- def needFinalTimeIndicatorConversion: Boolean
+class FlinkMiniBatchIntervalTraitInitProgram extends FlinkOptimizeProgram[StreamOptimizeContext] {
+ override def optimize(input: RelNode, context: StreamOptimizeContext): RelNode = {
+ val updatedTraitSet = input.getTraitSet.plus(
+ new MiniBatchIntervalTrait(context.getMiniBatchInterval))
+ input.copy(updatedTraitSet, input.getInputs)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index b64ff31..4589aaa 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -188,6 +188,14 @@ object FlinkStreamProgram {
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.RETRACTION_RULES)
.build(), "retraction rules")
+ .addProgram(new FlinkMiniBatchIntervalTraitInitProgram,
+ "Initialization for mini-batch interval inference")
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.TOP_DOWN)
+ .add(FlinkStreamRuleSets.MINI_BATCH_RULES)
+ .build(), "mini-batch interval rules")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
index 2e2a722..d1bf96e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
@@ -18,6 +18,8 @@
package org.apache.flink.table.plan.optimize.program
+import org.apache.flink.table.plan.`trait`.MiniBatchInterval
+
import org.apache.calcite.rex.RexBuilder
/**
@@ -38,6 +40,11 @@ trait StreamOptimizeContext extends FlinkOptimizeContext {
def updateAsRetraction: Boolean
/**
+ * Returns the mini-batch interval that sink requests.
+ */
+ def getMiniBatchInterval: MiniBatchInterval
+
+ /**
* Returns true if the output node needs final TimeIndicator conversion
* defined in [[org.apache.flink.table.api.TableEnvironment.optimize]].
*/
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index ca94b8e..f5cc836 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -341,6 +341,7 @@ object FlinkStreamRuleSets {
StreamExecDataStreamScanRule.INSTANCE,
StreamExecTableSourceScanRule.INSTANCE,
StreamExecIntermediateTableScanRule.INSTANCE,
+ StreamExecWatermarkAssignerRule.INSTANCE,
StreamExecValuesRule.INSTANCE,
// calc
StreamExecCalcRule.INSTANCE,
@@ -385,6 +386,14 @@ object FlinkStreamRuleSets {
)
/**
+ * RuleSet related to watermark assignment.
+ */
+ val MINI_BATCH_RULES: RuleSet = RuleSets.ofList(
+ // watermark interval infer rule
+ MiniBatchIntervalInferRule.INSTANCE
+ )
+
+ /**
* RuleSet to optimize plans after stream exec execution.
*/
val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
new file mode 100644
index 0000000..8078de2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
+import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that infers the mini-batch interval of watermark assigner.
+ *
+ * This rule could handle the following two kinds of operator:
+ * 1. supports operators which supports mini-batch and does not require watermark, e.g.
+ * group aggregate. In this case, [[StreamExecWatermarkAssigner]] with Protime mode will be
+ * created if not exist, and the interval value will be set as
+ * [[TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY]].
+ * 2. supports operators which requires watermark, e.g. window join, window aggregate.
+ * In this case, [[StreamExecWatermarkAssigner]] already exists, and its MiniBatchIntervalTrait
+ * will be updated as the merged intervals from its outputs.
+ * Currently, mini-batched window aggregate is not supported, and will be introduced later.
+ *
+ * NOTES: This rule only supports HepPlanner with TOP_DOWN match order.
+ */
+class MiniBatchIntervalInferRule extends RelOptRule(
+ operand(classOf[StreamPhysicalRel], any()),
+ "MiniBatchIntervalInferRule") {
+
+ /**
+ * Get all children RelNodes of a RelNode.
+ *
+ * @param parent The parent RelNode
+ * @return All child nodes
+ */
+ def getInputs(parent: RelNode): Seq[RelNode] = {
+ parent.getInputs.map(_.asInstanceOf[HepRelVertex].getCurrentRel)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val rel: StreamPhysicalRel = call.rel(0)
+ val miniBatchIntervalTrait = rel.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+ val inputs = getInputs(rel)
+ val config = FlinkRelOptUtil.getTableConfigFromContext(rel)
+ val miniBatchEnabled = config.getConf.contains(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
+
+ val updatedTrait = rel match {
+ case _: StreamExecGroupWindowAggregate =>
+ // TODO introduce mini-batch window aggregate later
+ MiniBatchIntervalTrait.NO_MINIBATCH
+
+ case _: StreamExecWatermarkAssigner => MiniBatchIntervalTrait.NONE
+
+ case _ => if (rel.requireWatermark && miniBatchEnabled) {
+ val mergedInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
+ miniBatchIntervalTrait.getMiniBatchInterval, MiniBatchInterval(0, MiniBatchMode.RowTime))
+ new MiniBatchIntervalTrait(mergedInterval)
+ } else {
+ miniBatchIntervalTrait
+ }
+ }
+
+ // propagate parent's MiniBatchInterval to children.
+ val updatedInputs = inputs.map { input =>
+ val originTrait = input.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+ val newChild = if (originTrait != updatedTrait) {
+ /**
+ * calc new MiniBatchIntervalTrait according parent's miniBatchInterval
+ * and the child's original miniBatchInterval.
+ */
+ val mergedMiniBatchInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
+ originTrait.getMiniBatchInterval, updatedTrait.getMiniBatchInterval)
+ val inferredTrait = new MiniBatchIntervalTrait(mergedMiniBatchInterval)
+ input.copy(input.getTraitSet.plus(inferredTrait), input.getInputs)
+ } else {
+ input
+ }
+
+ // add mini-batch watermark assigner node.
+ if (isTableSourceScan(newChild) &&
+ newChild.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+ .getMiniBatchInterval.mode == MiniBatchMode.ProcTime) {
+ StreamExecWatermarkAssigner.createIngestionTimeWatermarkAssigner(
+ newChild.getCluster,
+ newChild.getTraitSet,
+ newChild.copy(newChild.getTraitSet.plus(MiniBatchIntervalTrait.NONE), newChild.getInputs))
+ } else {
+ newChild
+ }
+ }
+ // update parent if a child was updated
+ if (inputs != updatedInputs) {
+ val newRel = rel.copy(rel.getTraitSet, updatedInputs)
+ call.transformTo(newRel)
+ }
+ }
+
+ private def isTableSourceScan(node: RelNode): Boolean = node match {
+ case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan => true
+ case _ => false
+ }
+}
+
+object MiniBatchIntervalInferRule {
+ val INSTANCE: RelOptRule = new MiniBatchIntervalInferRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecWatermarkAssignerRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecWatermarkAssignerRule.scala
new file mode 100644
index 0000000..a7fcafe
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecWatermarkAssignerRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWatermarkAssigner
+import org.apache.flink.table.plan.nodes.physical.stream.StreamExecWatermarkAssigner
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+/**
+ * Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamExecWatermarkAssigner]].
+ */
+class StreamExecWatermarkAssignerRule
+ extends ConverterRule(
+ classOf[FlinkLogicalWatermarkAssigner],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamExecWatermarkAssignerRule") {
+
+ override def convert(rel: RelNode): RelNode = {
+ val watermarkAssigner = rel.asInstanceOf[FlinkLogicalWatermarkAssigner]
+ val convertInput = RelOptRule.convert(
+ watermarkAssigner.getInput, FlinkConventions.STREAM_PHYSICAL)
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+
+ if (watermarkAssigner.rowtimeFieldIndex.isEmpty) {
+ throw new TableException("rowtimeFieldIndex should not be empty")
+ }
+
+ if (watermarkAssigner.watermarkDelay.isEmpty) {
+ throw new TableException("watermarkDelay should not be empty")
+ }
+
+ StreamExecWatermarkAssigner.createRowTimeWatermarkAssigner(
+ watermarkAssigner.getCluster,
+ traitSet,
+ convertInput,
+ watermarkAssigner.rowtimeFieldIndex.get,
+ watermarkAssigner.watermarkDelay.get)
+ }
+}
+
+object StreamExecWatermarkAssignerRule {
+ val INSTANCE = new StreamExecWatermarkAssignerRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala
index 49fcc7f..984115e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala
@@ -56,17 +56,24 @@ class MiniBatchIntervalTrait(miniBatchInterval: MiniBatchInterval) extends RelTr
}
/**
- * @param interval interval of minibatch
- * @param mode type of minibatch: rowtime/proctime
+ * @param interval interval of mini-batch
+ * @param mode type of mini-batch: rowtime/proctime
*/
case class MiniBatchInterval(interval: Long, mode: MiniBatchMode)
object MiniBatchInterval {
+ // default none value.
val NONE = MiniBatchInterval(0L, MiniBatchMode.None)
+ // specific for cases when there exists nodes require watermark but mini-batch interval of
+ // watermark is disabled by force, e.g. existing window aggregate.
+ // The difference between NONE AND NO_MINIBATCH is when merging with other miniBatchInterval,
+ // NONE case yields other miniBatchInterval, while NO_MINIBATCH case yields NO_MINIBATCH.
+ val NO_MINIBATCH = MiniBatchInterval(-1L, MiniBatchMode.None)
}
object MiniBatchIntervalTrait {
val NONE = new MiniBatchIntervalTrait(MiniBatchInterval.NONE)
+ val NO_MINIBATCH = new MiniBatchIntervalTrait(MiniBatchInterval.NO_MINIBATCH)
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
index d5c7487..8a01174 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
@@ -19,7 +19,9 @@ package org.apache.flink.table.plan.util
import org.apache.flink.table.api.{PlannerConfigOptions, TableConfig}
import org.apache.flink.table.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory}
+import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
import org.apache.flink.table.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
+
import com.google.common.collect.{ImmutableList, Lists}
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan.RelOptUtil.InputFinder
@@ -35,6 +37,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.mapping.Mappings
import org.apache.calcite.util.{ImmutableBitSet, Pair, Util}
+import org.apache.commons.math3.util.ArithmeticUtils
import java.io.{PrintWriter, StringWriter}
import java.math.BigDecimal
@@ -721,4 +724,71 @@ object FlinkRelOptUtil {
}
}
+ /**
+ * Merge two MiniBatchInterval as a new one.
+ *
+ * The Merge Logic: MiniBatchMode: (R: rowtime, P: proctime, N: None), I: Interval
+ * Possible values:
+ * - (R, I = 0): operators that require watermark (window excluded).
+ * - (R, I > 0): window / operators that require watermark with minibatch enabled.
+ * - (R, I = -1): existing window aggregate
+ * - (P, I > 0): unbounded agg with minibatch enabled.
+ * - (N, I = 0): no operator requires watermark, minibatch disabled
+ * ------------------------------------------------
+ * | A | B | merged result
+ * ------------------------------------------------
+ * | R, I_1 == 0 | R, I_2 | R, gcd(I_1, I_2)
+ * ------------------------------------------------
+ * | R, I_1 == 0 | P, I_2 | R, I_2
+ * ------------------------------------------------
+ * | R, I_1 > 0 | R, I_2 | R, gcd(I_1, I_2)
+ * ------------------------------------------------
+ * | R, I_1 > 0 | P, I_2 | R, I_1
+ * ------------------------------------------------
+ * | R, I_1 = -1 | R, I_2 | R, I_1
+ * ------------------------------------------------
+ * | R, I_1 = -1 | P, I_2 | R, I_1
+ * ------------------------------------------------
+ * | P, I_1 | R, I_2 == 0 | R, I_1
+ * ------------------------------------------------
+ * | P, I_1 | R, I_2 > 0 | R, I_2
+ * ------------------------------------------------
+ * | P, I_1 | P, I_2 > 0 | P, I_1
+ * ------------------------------------------------
+ */
+ def mergeMiniBatchInterval(
+ interval1: MiniBatchInterval,
+ interval2: MiniBatchInterval): MiniBatchInterval = {
+ if (interval1 == MiniBatchInterval.NO_MINIBATCH ||
+ interval2 == MiniBatchInterval.NO_MINIBATCH) {
+ return MiniBatchInterval.NO_MINIBATCH
+ }
+ interval1.mode match {
+ case MiniBatchMode.None => interval2
+ case MiniBatchMode.RowTime =>
+ interval2.mode match {
+ case MiniBatchMode.None => interval1
+ case MiniBatchMode.RowTime =>
+ val gcd = ArithmeticUtils.gcd(interval1.interval, interval2.interval)
+ MiniBatchInterval(gcd, MiniBatchMode.RowTime)
+ case MiniBatchMode.ProcTime =>
+ if (interval1.interval == 0) {
+ MiniBatchInterval(interval2.interval, MiniBatchMode.RowTime)
+ } else {
+ interval1
+ }
+ }
+ case MiniBatchMode.ProcTime =>
+ interval2.mode match {
+ case MiniBatchMode.None | MiniBatchMode.ProcTime => interval1
+ case MiniBatchMode.RowTime =>
+ if (interval2.interval > 0) {
+ interval2
+ } else {
+ MiniBatchInterval(interval1.interval, MiniBatchMode.RowTime)
+ }
+ }
+ }
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index 5398418..652d833 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -534,6 +534,42 @@ Sink(fields=[a, b, c])
]]>
</Resource>
</TestCase>
+ <TestCase name="testExplainWithSingleSink[extended=true]">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+
+== Optimized Logical Plan ==
+Sink(fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- Calc(select=[a, b, c], where=[>(a, 10)], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(where: (a > 10), select: (a, b, c))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testExplainWithSort[extended=false]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
@@ -588,39 +624,118 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
]]>
</Resource>
</TestCase>
- <TestCase name="testExplainWithSingleSink[extended=true]">
+ <TestCase name="testMiniBatchIntervalInfer[extended=true]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
-LogicalSink(fields=[a, b, c])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalFilter(condition=[>($0, 10)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
== Optimized Logical Plan ==
-Sink(fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- Calc(select=[a, b, c], where=[>(a, 10)], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc], reuse_id=[1]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ :- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ : +- WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ : +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, text, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Exchange(distribution=[hash[id2]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+
+Sink(fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Reused(reference_id=[1])
+
+Sink(fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Reused(reference_id=[1])
== Physical Execution Plan ==
: Data Source
content : collect elements with CollectionInputFormat
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
ship_strategy : FORWARD
: Operator
- content : Calc(where: (a > 10), select: (a, b, c))
+ content : WatermarkAssigner(rowtime: 2, offset: 0)
ship_strategy : FORWARD
: Operator
- content : SinkConversionToRow
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
ship_strategy : FORWARD
- : Data Sink
- content : Sink: TestingAppendTableSink
+ : Operator
+ content : WatermarkAssigner(rowtime: 4, offset: 0)
ship_strategy : FORWARD
+ : Operator
+ content : Co-Process
+ ship_strategy : HASH
+
+ : Operator
+ content : Calc(select: (id1, rowtime AS ts, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
]]>
</Resource>
</TestCase>
@@ -656,4 +771,119 @@ Union(all=[true], union=[a, b, c])
]]>
</Resource>
</TestCase>
+ <TestCase name="testMiniBatchIntervalInfer[extended=false]">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+== Optimized Logical Plan ==
+Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
++- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
+ :- Exchange(distribution=[hash[id1]])
+ : +- WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ : +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, text, rowtime])
+ +- Exchange(distribution=[hash[id2]])
+ +- WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+ +- Exchange(distribution=[hash[id1]])
+ +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text])
+ +- Reused(reference_id=[1])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+ +- Exchange(distribution=[hash[id1]])
+ +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text])
+ +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : WatermarkAssigner(rowtime: 2, offset: 0)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : WatermarkAssigner(rowtime: 4, offset: 0)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Co-Process
+ ship_strategy : HASH
+
+ : Operator
+ content : Calc(select: (id1, rowtime AS ts, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml
index 27d03a1..8e3ed14 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml
@@ -47,9 +47,11 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(count$0) AS frequ
: +- GlobalGroupAggregate(groupBy=[word], select=[word, COUNT(count$0) AS cnt], updateAsRetraction=[true], accMode=[AccRetract])
: +- Exchange(distribution=[hash[word]], updateAsRetraction=[true], accMode=[Acc])
: +- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], updateAsRetraction=[true], accMode=[Acc])
- : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+ : +- WatermarkAssigner(fields=[word, number], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[cnt], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(word, cnt)]]], fields=[word, cnt], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[word, cnt], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(word, cnt)]]], fields=[word, cnt], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -70,7 +72,8 @@ Calc(select=[EXPR$0], updateAsRetraction=[false], accMode=[Acc])
+- GlobalGroupAggregate(groupBy=[word], select=[word, COUNT(count$0) AS EXPR$0], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[word]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[word, number], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -99,7 +102,8 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(count1$0) AS freq
+- GlobalGroupAggregate(groupBy=[word], select=[word, COUNT(count$0) AS cnt], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[word]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[word, number], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.xml
new file mode 100644
index 0000000..e6b7c43
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -0,0 +1,572 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testMiniBatchOnly">
+ <Resource name="sql">
+ <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+ +- Calc(select=[b, a, c])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], miniBatchInterval=[Proctime, 1000ms])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiOperatorNeedsWatermark1">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT
+ b, COUNT(a),
+ TUMBLE_START(rt, INTERVAL '5' SECOND),
+ TUMBLE_END(rt, INTERVAL '5' SECOND)
+ FROM (
+ SELECT t1.a as a, t1.b as b, t1.rowtime as rt
+ FROM
+ LeftT as t1 JOIN RightT as t2
+ ON
+ t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+ t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)])
+ +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
+ +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, rowtime, a])
+ +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a, b, rowtime])
+ : +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, rowtime])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiOperatorNeedsWatermark2">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT b, COUNT(a)
+ OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)
+ FROM (
+ SELECT t1.a as a, t1.b as b, t1.rt as rt
+ FROM
+ (
+ SELECT b,
+ COUNT(a) as a,
+ TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
+ FROM LeftT
+ GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
+ ) as t1
+ JOIN
+ (
+ SELECT b,
+ COUNT(a) as a,
+ HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+ FROM RightT
+ GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ ) as t2
+ ON
+ t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+ t2.rt + INTERVAL '10' SECOND
+ )
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)])
++- LogicalJoin(condition=[AND(=($1, $4), >=($2, -($5, 5000:INTERVAL SECOND)), <=($2, +($5, 10000:INTERVAL SECOND)))], joinType=[inner])
+ :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)])
+ : +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+ : +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
+ : +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)])
+ +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+ +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[b, w0$o0 AS $1])
++- OverAggregate(partitionBy=[b], orderBy=[rt ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[b, a, rt, COUNT(a) AS w0$o0])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, a, rt])
+ +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(rt, -(rt0, 5000:INTERVAL SECOND)), <=(rt, +(rt0, 10000:INTERVAL SECOND)))], select=[b, a, rt, b0, a0, rt0])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[b, a, w$rowtime AS rt])
+ : +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ : +- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[b, rowtime, a])
+ : +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[b, a, w$rowtime AS rt])
+ +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, rowtime, a])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiOperatorNeedsWatermark3">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT t1.a, t1.b
+ FROM (
+ SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a
+ ) as t1
+ JOIN (
+ SELECT b, COUNT(a) as a
+ FROM (
+ SELECT b, COUNT(a) as a,
+ HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+ FROM RightT
+ GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ )
+ GROUP BY b
+ ) as t2
+ ON t1.a = t2.a
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+ :- LogicalProject(a=[$0], b=[$1], a0=[CAST($0):BIGINT])
+ : +- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+ : +- LogicalProject(a=[$0], b=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalAggregate(group=[{0}], a=[COUNT()])
+ +- LogicalProject(b=[$0], a=[$2])
+ +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+ +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, b])
++- Join(joinType=[InnerJoin], where=[=(a0, a1)], select=[a, b, a0, b0, a1], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
+ :- Exchange(distribution=[hash[a0]])
+ : +- Calc(select=[a, b, CAST(a) AS a0])
+ : +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
+ : +- Exchange(distribution=[hash[a]])
+ : +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0])
+ : +- Calc(select=[a, b])
+ : +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], miniBatchInterval=[Proctime, 6000ms])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count1$0) AS a])
+ +- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(*) AS count1$0])
+ +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS a])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, rowtime, a])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultipleWindowAggregates">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$1], EXPR$1=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+ +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1])
+ +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
+ +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$1], EXPR$1=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'-'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+ +- LogicalProject(id1=[$0], text=[$1])
+ +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
+ +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+== Optimized Logical Plan ==
+Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
++- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
+ :- Exchange(distribution=[hash[id1]])
+ : +- WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ : +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+ +- Exchange(distribution=[hash[id2]])
+ +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
+
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
++- Exchange(distribution=[hash[id1]])
+ +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text])
+ +- Reused(reference_id=[1])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+ +- Exchange(distribution=[hash[id1]])
+ +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text])
+ +- Reused(reference_id=[2])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+ +- Exchange(distribution=[hash[id1]])
+ +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text])
+ +- Reused(reference_id=[1])
+
+Sink(fields=[a, b])
++- GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
+ +- Exchange(distribution=[hash[id1]])
+ +- LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+ +- Calc(select=[id1, text])
+ +- Reused(reference_id=[2])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, rowtime, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : WatermarkAssigner(rowtime: 1, offset: 0)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, rowtime, cnt, name, goods))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : WatermarkAssigner(rowtime: 1, offset: 0)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Co-Process
+ ship_strategy : HASH
+
+ : Operator
+ content : Calc(select: (id1, rowtime AS ts, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+ ship_strategy : HASH
+
+ : Operator
+ content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select: (id1, text))
+ ship_strategy : FORWARD
+
+ : Operator
+ content : LocalGroupAggregate
+ ship_strategy : FORWARD
+
+ : Operator
+ content : GlobalGroupAggregate
+ ship_strategy : HASH
+
+ : Operator
+ content : SinkConversionToTuple2
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: TestingRetractTableSink
+ ship_strategy : FORWARD
+
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRedundantWatermarkDefinition">
+ <Resource name="sql">
+ <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+ +- Calc(select=[b, a, c])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Proctime, 1000ms])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRowtimeRowsOverWithMiniBatch">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT cnt, COUNT(c)
+ FROM (
+ SELECT c, COUNT(a)
+ OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
+ FROM MyTable3
+ )
+ GROUP BY cnt
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(cnt=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)], c=[$2])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[cnt]])
+ +- LocalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(c) AS count$0])
+ +- Calc(select=[w0$o0 AS cnt, c])
+ +- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, rowtime])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Rowtime, 1000ms])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWindowJoinWithMiniBatch">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT b, COUNT(a)
+ FROM (
+ SELECT t1.a as a, t1.b as b
+ FROM
+ LeftT as t1 JOIN RightT as t2
+ ON
+ t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+ t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(b=[$1], a=[$0])
+ +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
+ +- Calc(select=[b, a])
+ +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a, b, rowtime])
+ : +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Rowtime, 1000ms])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, rowtime])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Rowtime, 1000ms])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWindowCascade">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT b,
+ SUM(cnt)
+ FROM (
+ SELECT b,
+ COUNT(a) as cnt,
+ TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
+ FROM MyTable3
+ GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
+ )
+ GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(b=[$0], EXPR$1=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
+ +- LogicalProject(b=[$0], $f1=[TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2])
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
+ +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, $SUM0(cnt) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, w$rowtime AS $f1, cnt])
+ +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, rowtime, a])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWindowWithEarlyFire">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT b, SUM(cnt)
+ FROM (
+ SELECT b,
+ COUNT(a) as cnt,
+ HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
+ HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
+ FROM MyTable3
+ GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ )
+ GROUP BY b
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
++- LogicalProject(b=[$0], cnt=[$2])
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
+ +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
+ +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(sum$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(cnt) AS sum$0, COUNT_RETRACT(*) AS count1$1])
+ +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS cnt], emit=[early delay 500 millisecond])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, rowtime, a])
+ +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml
index caf9de2..f87775a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml
@@ -37,7 +37,8 @@ GlobalGroupAggregate(groupBy=[a1], select=[a1, MAX(max$0) AS EXPR$1], updateAsRe
+- GlobalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(max$0) AS a3], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a1, a2]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(a3) AS max$0], updateAsRetraction=[true], accMode=[Acc])
- +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a1, a2, a3], miniBatchInterval=[Proctime, 100ms], updateAsRetraction=[true], accMode=[Acc])
+ +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -108,7 +109,8 @@ GlobalGroupAggregate(groupBy=[a1], select=[a1, MIN_RETRACT(min$0) AS EXPR$1], up
+- GlobalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(max$0) AS a3], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a1, a2]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(a3) AS max$0], updateAsRetraction=[true], accMode=[Acc])
- +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a1, a2, a3], miniBatchInterval=[Proctime, 100ms], updateAsRetraction=[true], accMode=[Acc])
+ +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -133,7 +135,8 @@ GlobalGroupAggregate(select=[MIN(min$0) AS EXPR$0], updateAsRetraction=[false],
+- GlobalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MIN(min$0) AS a3], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a1, a2]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MIN(a3) AS min$0], updateAsRetraction=[true], accMode=[Acc])
- +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a1, a2, a3], miniBatchInterval=[Proctime, 100ms], updateAsRetraction=[true], accMode=[Acc])
+ +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml
index d03ec5f..206f59f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml
@@ -43,8 +43,10 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0])
+- Union(all=[true], union=[a, b, c])
- :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ :- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -131,7 +133,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) FILTER $f2 AS sum$0, COUNT(distinct$0 c) FILTER $f4 AS count$1, COUNT(distinct$0 c) FILTER $f5 AS count$2, MAX(b) AS max$3, DISTINCT(c) AS distinct$0])
+- Calc(select=[a, b, IS TRUE(=(c, _UTF-16LE'A':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) AS $f2, c, IS TRUE(d) AS $f4, IS TRUE(=(b, 1:BIGINT)) AS $f5])
- +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+ +- WatermarkAssigner(fields=[a, b, c, d], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
@@ -151,7 +154,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXP
GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -259,8 +263,10 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0])
+- Union(all=[true], union=[a, b, c])
- :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ :- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
index 592cb9d..e9ec761 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -40,7 +40,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM
GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) FILTER $f2 AS EXPR$1, SUM(b) FILTER $f3 AS EXPR$2, SUM(b) FILTER $f2 AS EXPR$3])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -69,7 +70,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) FILTER $f2 AS count$0, SUM(b) FILTER $f3 AS sum$1, SUM(b) FILTER $f2 AS sum$2, DISTINCT(b) AS distinct$0])
+- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -101,7 +103,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
+- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -135,7 +138,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
+- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -169,7 +173,8 @@ GroupAggregate(groupBy=[b], select=[b, FIRST_VALUE_RETRACT(c) AS EXPR$1, LAST_VA
+- GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) AS b, MAX(b) AS c], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -204,7 +209,8 @@ GroupAggregate(groupBy=[b], select=[b, FIRST_VALUE_RETRACT(c) AS EXPR$1, LAST_VA
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -242,7 +248,8 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
+- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, MAX(b) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -282,7 +289,8 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
+- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -313,7 +321,8 @@ GroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(b) AS EXPR$1, MAX_RETRACT(b)
+- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[AccRetract])
+- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -346,7 +355,8 @@ GlobalGroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(min$0) AS EXPR$1, MAX_R
+- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -382,7 +392,8 @@ GroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRACT($f3
+- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
+- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -421,7 +432,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
+- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -441,7 +453,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $
GroupAggregate(select=[COUNT(DISTINCT a) AS EXPR$0, SUM(DISTINCT b) AS EXPR$1])
+- Exchange(distribution=[single])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -462,7 +475,8 @@ GlobalGroupAggregate(select=[COUNT(distinct$0 count$0) AS EXPR$0, SUM(distinct$1
+- Exchange(distribution=[single])
+- LocalGroupAggregate(select=[COUNT(distinct$0 a) AS count$0, SUM(distinct$1 b) AS sum$1, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -486,7 +500,8 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f2_0) AS $f0, SU
+- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -512,7 +527,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
+- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -543,7 +559,8 @@ GroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(b) AS EXPR$1, MAX_RETRACT(b)
+- Exchange(distribution=[hash[c]])
+- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -576,7 +593,8 @@ GlobalGroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(min$0) AS EXPR$1, MAX_R
+- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c])
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -612,7 +630,8 @@ GroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRACT($f3
+- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -651,7 +670,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
+- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c])
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -670,7 +690,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -690,7 +711,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2
GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -714,7 +736,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETR
+- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -740,7 +763,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AG
+- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -760,7 +784,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)])
GroupAggregate(select=[COUNT(DISTINCT c) AS EXPR$0])
+- Exchange(distribution=[single])
+- Calc(select=[c])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -781,7 +806,8 @@ GlobalGroupAggregate(select=[COUNT(distinct$0 count$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalGroupAggregate(select=[COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[c])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -803,7 +829,8 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f1_0) AS $f0])
+- GroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(DISTINCT c) AS $f1_0])
+- Exchange(distribution=[hash[$f1]])
+- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -827,7 +854,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
+- Exchange(distribution=[hash[$f1]])
+- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -851,7 +879,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+- GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT c) AS EXPR$1, $SUM0(b) AS EXPR$2, COUNT(b) AS $f3, MAX(b) AS EXPR$4, MIN(b) AS EXPR$5, COUNT(*) AS EXPR$7])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -876,7 +905,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
+- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1, $SUM0(sum$1) AS EXPR$2, COUNT(count$2) AS $f3, MAX(max$3) AS EXPR$4, MIN(min$4) AS EXPR$5, COUNT(count1$5) AS EXPR$7])
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 c) AS count$0, $SUM0(b) AS sum$1, COUNT(b) AS count$2, MAX(b) AS max$3, MIN(b) AS min$4, COUNT(*) AS count1$5, DISTINCT(c) AS distinct$0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -905,7 +935,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -936,7 +967,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -956,7 +988,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) AS EXPR$1, SUM(b) AS EXPR$2, AVG(b) AS EXPR$3])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -977,7 +1010,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) AS count$0, SUM(b) AS sum$1, AVG(b) AS (sum$2, count$3), DISTINCT(b) AS distinct$0])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1002,7 +1036,8 @@ Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1029,7 +1064,8 @@ Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1049,7 +1085,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)])
GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT c) AS EXPR$1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1070,7 +1107,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[a, c])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1092,7 +1130,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT c) AS $f2_0])
+- Exchange(distribution=[hash[a, $f2]])
+- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1116,7 +1155,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- Exchange(distribution=[hash[a, $f2]])
+- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1147,7 +1187,8 @@ GroupAggregate(groupBy=[a], select=[a, COUNT_RETRACT(DISTINCT b) AS EXPR$1, COUN
+- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -1180,7 +1221,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) A
+- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -1215,7 +1257,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -1253,7 +1296,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -1272,7 +1316,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1291,7 +1336,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1315,7 +1361,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, FIRST_VALUE_RET
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1339,7 +1386,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, FIRST_VALUE_RET
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1358,7 +1406,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1377,7 +1426,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1396,7 +1446,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1415,7 +1466,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1434,7 +1486,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $2
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1453,7 +1506,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $2
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1477,7 +1531,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LAST_VALUE_RETR
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1501,7 +1556,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LAST_VALUE_RETR
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1520,7 +1576,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1539,7 +1596,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1558,7 +1616,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1577,7 +1636,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1599,7 +1659,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($2)])
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) AS EXPR$1, MAX(c) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1622,7 +1683,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($2)])
GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) AS count$0, MAX(c) AS max$1, DISTINCT(b) AS distinct$0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1649,7 +1711,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1671,7 +1734,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- GroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(DISTINCT a) AS $f1, COUNT(b) AS $f2])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1700,7 +1764,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1721,7 +1786,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS distinct$0])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1745,7 +1811,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS distinct$0])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1765,7 +1832,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT a) AS EXPR$1, COUNT(b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
index b4b5e82..a415121 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -45,7 +45,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
+- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -84,7 +85,8 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
+- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -122,7 +124,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
+- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -147,7 +150,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0, SU
+- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -185,7 +189,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
+- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c])
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -210,7 +215,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AG
+- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -233,7 +239,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0])
+- Exchange(distribution=[hash[$f1]])
+- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -263,7 +270,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -289,7 +297,8 @@ Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -312,7 +321,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- Exchange(distribution=[hash[a, $f2]])
+- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -349,7 +359,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -373,7 +384,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, FIRST_VALUE_RET
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -392,7 +404,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -416,7 +429,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LAST_VALUE_RETR
+- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -435,7 +449,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
<![CDATA[
GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -463,7 +478,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -486,7 +502,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- Exchange(distribution=[hash[a]])
+- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS distinct$0])
+- Calc(select=[a, b])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml
index 2458103..e249131 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml
@@ -35,7 +35,8 @@ Calc(select=[EXPR$0])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, AVG(a) AS (sum$0, count$1)])
+- Calc(select=[b, a])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -56,7 +57,8 @@ GlobalGroupAggregate(select=[AVG((sum$0, count$1)) AS EXPR$0]), rowType=[RecordT
+- Exchange(distribution=[single]), rowType=[RecordType(DOUBLE sum$0, BIGINT count$1)]
+- LocalGroupAggregate(select=[AVG($f0) AS (sum$0, count$1)]), rowType=[RecordType(DOUBLE sum$0, BIGINT count$1)]
+- Calc(select=[CAST(a) AS $f0]), rowType=[RecordType(DOUBLE $f0)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
]]>
</Resource>
</TestCase>
@@ -79,7 +81,8 @@ Calc(select=[EXPR$0])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
+- Calc(select=[b, a])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -102,7 +105,8 @@ Calc(select=[EXPR$0, EXPR$1])
+- Exchange(distribution=[hash[d]])
+- LocalGroupAggregate(groupBy=[d], select=[d, MIN(c) AS min$0, AVG(a) AS (sum$1, count$2)])
+- Calc(select=[+(b, 3) AS d, c, a])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -123,7 +127,8 @@ GlobalGroupAggregate(select=[COUNT(count$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalGroupAggregate(select=[COUNT(a) AS count$0])
+- Calc(select=[a])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -147,7 +152,8 @@ Calc(select=[CAST(2:BIGINT) AS b, EXPR$1])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
+- Calc(select=[CAST(2:BIGINT) AS b, a], where=[=(b, 2:BIGINT)])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -170,7 +176,8 @@ Calc(select=[4 AS four, EXPR$1])
+- Exchange(distribution=[hash[b, four]])
+- LocalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS sum$0])
+- Calc(select=[b, 4 AS four, a])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index d0f6460..6c10d5d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.api.stream
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.TableConfigOptions
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.table.util.TableTestBase
@@ -98,6 +98,47 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
util.verifyExplain(extended)
}
+ @Test
+ def testMiniBatchIntervalInfer(): Unit = {
+ // Test emit latency propagate among RelNodeBlocks
+ util.addDataStream[(Int, String)]("T1", 'id1, 'text, 'rowtime)
+ util.addDataStream[(Int, String, Int, String, Long)](
+ "T2", 'id2, 'cnt, 'name, 'goods, 'rowtime)
+ util.addTableWithWatermark("T3", util.tableEnv.scan("T1"), "rowtime", 0)
+ util.addTableWithWatermark("T4", util.tableEnv.scan("T2"), "rowtime", 0)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 3000L)
+ val table = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1, T3.rowtime AS ts, text
+ | FROM T3, T4
+ |WHERE id1 = id2
+ | AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
+ | AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+ """.stripMargin)
+ util.tableEnv.registerTable("TempTable", table)
+
+ val table1 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1, CONCAT_AGG('#', text)
+ |FROM TempTable
+ |GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND)
+ """.stripMargin)
+ val appendSink1 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+ util.tableEnv.writeToSink(table1, appendSink1)
+
+ val table2 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1, CONCAT_AGG('*', text)
+ |FROM TempTable
+ |GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND)
+ """.stripMargin)
+ val appendSink2 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+ util.tableEnv.writeToSink(table2, appendSink2)
+
+ util.verifyExplain(extended)
+ }
+
}
object ExplainTest {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
new file mode 100644
index 0000000..66da915
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+class MiniBatchIntervalInferTest extends TableTestBase {
+ private val util = streamTestUtil()
+
+ val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val LONG = new BigIntType()
+ val INT = new IntType()
+
+ @Before
+ def setup(): Unit = {
+ util.addDataStream[(Int, String, Long)]("MyTable1", 'a, 'b, 'c, 'proctime, 'rowtime)
+ util.addDataStream[(Int, String, Long)]("MyTable2", 'a, 'b, 'c, 'proctime, 'rowtime)
+ }
+
+ @Test
+ def testMiniBatchOnly(): Unit = {
+ util.tableEnv.getConfig.getConf
+ .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+ val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b"
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testRedundantWatermarkDefinition(): Unit = {
+ util.tableEnv.getConfig.getConf
+ .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+ util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b"
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testWindowWithEarlyFire(): Unit = {
+ util.tableEnv.getConfig.getConf
+ .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+ util.tableEnv.getConfig.withEarlyFireInterval(Time.milliseconds(500))
+ util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ val sql =
+ """
+ | SELECT b, SUM(cnt)
+ | FROM (
+ | SELECT b,
+ | COUNT(a) as cnt,
+ | HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
+ | HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
+ | FROM MyTable3
+ | GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ | )
+ | GROUP BY b
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testWindowCascade(): Unit = {
+ util.tableEnv.getConfig.getConf
+ .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 3000L)
+ util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ val sql =
+ """
+ | SELECT b,
+ | SUM(cnt)
+ | FROM (
+ | SELECT b,
+ | COUNT(a) as cnt,
+ | TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
+ | FROM MyTable3
+ | GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
+ | )
+ | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testWindowJoinWithMiniBatch(): Unit = {
+ util.addTableWithWatermark("LeftT", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+ val sql =
+ """
+ | SELECT b, COUNT(a)
+ | FROM (
+ | SELECT t1.a as a, t1.b as b
+ | FROM
+ | LeftT as t1 JOIN RightT as t2
+ | ON
+ | t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+ | t2.rowtime + INTERVAL '10' SECOND
+ | )
+ | GROUP BY b
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testRowtimeRowsOverWithMiniBatch(): Unit = {
+ util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+ val sql =
+ """
+ | SELECT cnt, COUNT(c)
+ | FROM (
+ | SELECT c, COUNT(a)
+ | OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
+ | FROM MyTable3
+ | )
+ | GROUP BY cnt
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test(expected = classOf[NullPointerException])
+ // TODO remove the exception after TableImpl implements createTemporalTableFunction
+ def testTemporalTableFunctionJoinWithMiniBatch(): Unit = {
+ util.addTableWithWatermark("Orders", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ util.addTableWithWatermark("RatesHistory", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+ util.addFunction(
+ "Rates",
+ util.tableEnv.scan("RatesHistory").createTemporalTableFunction("rowtime", "b"))
+
+ val sqlQuery =
+ """
+ | SELECT r_a, COUNT(o_a)
+ | FROM (
+ | SELECT o.a as o_a, r.a as r_a
+ | FROM Orders As o,
+ | LATERAL TABLE (Rates(o.rowtime)) as r
+ | WHERE o.b = r.b
+ | )
+ | GROUP BY r_a
+ """.stripMargin
+
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiOperatorNeedsWatermark1(): Unit = {
+ // infer result: miniBatchInterval=[Rowtime, 0ms]
+ util.addTableWithWatermark("LeftT", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+ val sql =
+ """
+ | SELECT
+ | b, COUNT(a),
+ | TUMBLE_START(rt, INTERVAL '5' SECOND),
+ | TUMBLE_END(rt, INTERVAL '5' SECOND)
+ | FROM (
+ | SELECT t1.a as a, t1.b as b, t1.rowtime as rt
+ | FROM
+ | LeftT as t1 JOIN RightT as t2
+ | ON
+ | t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+ | t2.rowtime + INTERVAL '10' SECOND
+ | )
+ | GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testMultiOperatorNeedsWatermark2(): Unit = {
+ util.addTableWithWatermark("LeftT", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+ util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 6000L)
+
+ val sql =
+ """
+ | SELECT b, COUNT(a)
+ | OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)
+ | FROM (
+ | SELECT t1.a as a, t1.b as b, t1.rt as rt
+ | FROM
+ | (
+ | SELECT b,
+ | COUNT(a) as a,
+ | TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
+ | FROM LeftT
+ | GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
+ | ) as t1
+ | JOIN
+ | (
+ | SELECT b,
+ | COUNT(a) as a,
+ | HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+ | FROM RightT
+ | GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ | ) as t2
+ | ON
+ | t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+ | t2.rt + INTERVAL '10' SECOND
+ | )
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testMultiOperatorNeedsWatermark3(): Unit = {
+ util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 6000L)
+
+ val sql =
+ """
+ | SELECT t1.a, t1.b
+ | FROM (
+ | SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a
+ | ) as t1
+ | JOIN (
+ | SELECT b, COUNT(a) as a
+ | FROM (
+ | SELECT b, COUNT(a) as a,
+ | HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+ | FROM RightT
+ | GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ | )
+ | GROUP BY b
+ | ) as t2
+ | ON t1.a = t2.a
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ /**
+ * Test watermarkInterval trait infer among optimize block
+ */
+ @Test
+ def testMultipleWindowAggregates(): Unit = {
+ util.addDataStream[(Int, Long, String)]("T1", 'id1, 'rowtime, 'text)
+ util.addDataStream[(Int, Long, Int, String, String)]("T2", 'id2, 'rowtime, 'cnt, 'name, 'goods)
+ util.addTableWithWatermark("T3", util.tableEnv.scan("T1"), "rowtime", 0)
+ util.addTableWithWatermark("T4", util.tableEnv.scan("T2"), "rowtime", 0)
+
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 500L)
+ util.tableEnv.getConfig.getConf.setLong(
+ TableConfigOptions.SQL_EXEC_MINIBATCH_SIZE, 300L)
+
+ val table1 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1, T3.rowtime AS ts, text
+ | FROM T3, T4
+ |WHERE id1 = id2
+ | AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
+ | AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+ """.stripMargin)
+ util.tableEnv.registerTable("TempTable1", table1)
+
+ val table2 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1,
+ | CONCAT_AGG('#', text) as text,
+ | TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts
+ |FROM TempTable1
+ |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
+ """.stripMargin)
+ util.tableEnv.registerTable("TempTable2", table2)
+
+ val table3 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1,
+ | CONCAT_AGG('*', text)
+ |FROM TempTable2
+ |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1
+ """.stripMargin)
+ val appendSink1 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+ util.tableEnv.writeToSink(table3, appendSink1)
+
+ val table4 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1,
+ | CONCAT_AGG('-', text)
+ |FROM TempTable1
+ |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1
+ """.stripMargin)
+ val appendSink2 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+ util.tableEnv.writeToSink(table4, appendSink2)
+
+ val table5 = util.tableEnv.sqlQuery(
+ """
+ |SELECT id1,
+ | COUNT(text)
+ |FROM TempTable2
+ |GROUP BY id1
+ """.stripMargin)
+ val appendSink3 = util.createRetractTableSink(Array("a", "b"), Array(INT, LONG))
+ util.tableEnv.writeToSink(table5, appendSink3)
+
+ util.verifyExplain()
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
index b515b98..5125df0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
@@ -62,7 +62,6 @@ class AggregateTest extends TableTestBase {
def testAggWithMiniBatch(): Unit = {
util.tableEnv.getConfig.getConf.setLong(
TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
- // TODO supports MiniBatch
util.verifyPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable GROUP BY b")
}
@@ -70,7 +69,6 @@ class AggregateTest extends TableTestBase {
def testAggAfterUnionWithMiniBatch(): Unit = {
util.tableEnv.getConfig.getConf.setLong(
TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
- // TODO supports MiniBatch
val query =
"""
|SELECT a, sum(b), count(distinct c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
index ed34436..94f99d4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
+import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
import org.apache.flink.table.util.TableTestUtil
import org.apache.calcite.sql.SqlExplainLevel
@@ -74,4 +75,67 @@ class FlinkRelOptUtilTest {
assertEquals(expected2.trim, FlinkRelOptUtil.toString(rel, SqlExplainLevel.NO_ATTRIBUTES).trim)
}
+ @Test
+ def testMergeRowTimeAndNone(): Unit = {
+ val none = MiniBatchInterval.NONE
+ val rowtime = MiniBatchInterval(1000L, MiniBatchMode.RowTime)
+ val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(none, rowtime)
+ assertEquals(rowtime, mergedResult)
+ }
+
+ @Test
+ def testMergeProcTimeAndNone(): Unit = {
+ val none = MiniBatchInterval.NONE
+ val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+ val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(none, proctime)
+ assertEquals(proctime, mergedResult)
+ }
+
+ @Test
+ def testMergeRowTimeTAndProcTime1(): Unit = {
+ val rowtime = MiniBatchInterval(4000L, MiniBatchMode.RowTime)
+ val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+ val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(rowtime, proctime)
+ assertEquals(rowtime, mergedResult)
+ }
+
+ @Test
+ def testMergeRowTimeTAndProcTime2(): Unit = {
+ val rowtime = MiniBatchInterval(0L, MiniBatchMode.RowTime)
+ val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+ val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(rowtime, proctime)
+ assertEquals(MiniBatchInterval(1000L, MiniBatchMode.RowTime), mergedResult)
+ }
+
+ @Test
+ def testMergeRowTimeAndRowtime(): Unit = {
+ val rowtime1 = MiniBatchInterval(3000L, MiniBatchMode.RowTime)
+ val rowtime2 = MiniBatchInterval(5000L, MiniBatchMode.RowTime)
+ val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(rowtime1, rowtime2)
+ assertEquals(MiniBatchInterval(1000L, MiniBatchMode.RowTime), mergedResult)
+ }
+
+ @Test
+ def testMergeWithNoneMiniBatch(): Unit = {
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(
+ MiniBatchInterval.NO_MINIBATCH, MiniBatchInterval.NONE))
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(
+ MiniBatchInterval.NONE, MiniBatchInterval.NO_MINIBATCH))
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(
+ MiniBatchInterval.NO_MINIBATCH, MiniBatchInterval.NO_MINIBATCH))
+ val rowtime = MiniBatchInterval(3000L, MiniBatchMode.RowTime)
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(MiniBatchInterval.NO_MINIBATCH, rowtime))
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(rowtime, MiniBatchInterval.NO_MINIBATCH))
+ val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(MiniBatchInterval.NO_MINIBATCH, proctime))
+ assertEquals(MiniBatchInterval.NO_MINIBATCH,
+ FlinkRelOptUtil.mergeMiniBatchInterval(proctime, MiniBatchInterval.NO_MINIBATCH))
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
index b3d7f3f..ee00a17 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsA
import org.apache.flink.table.runtime.utils.StreamingWithAggTestBase.AggMode
import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils._
import org.apache.flink.table.runtime.utils.{StreamingWithAggTestBase, TestData, TestingRetractSink}
import org.apache.flink.table.typeutils.BigDecimalTypeInfo
@@ -558,6 +559,40 @@ class AggregateITCase(
}
@Test
+ def testWindowWithUnboundedAgg(): Unit = {
+ val t = failingDataSource(TestData.tupleData5.map {
+ case (a, b, c, d, e) => (b, a, c, d, e)
+ }).assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Int, String, Long)](0L))
+ .toTable(tEnv, 'rowtime, 'a, 'c, 'd, 'e)
+ tEnv.registerTable("MyTable", t)
+ val sourceTable = tEnv.scan("MyTable")
+ addTableWithWatermark("MyTable1", sourceTable, "rowtime", 0)
+
+ val innerSql =
+ """
+ |SELECT a,
+ | SUM(DISTINCT e) b,
+ | MIN(DISTINCT e) c,
+ | COUNT(DISTINCT e) d
+ |FROM MyTable1
+ |GROUP BY a, TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+ """.stripMargin
+
+ val sqlQuery = "SELECT c, MAX(a), COUNT(DISTINCT d) FROM (" + innerSql + ") GROUP BY c"
+
+ val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+ val sink = new TestingRetractSink
+ results.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,5,3",
+ "2,5,2")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
def testConcatAggWithNullData(): Unit = {
val dataWithNull = List(
(1, 1, null),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
index 1335e89..ea617c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
@@ -20,7 +20,11 @@ package org.apache.flink.table.runtime.utils
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{Table, TableException}
import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.table.operations.PlannerQueryOperation
+import org.apache.flink.table.plan.nodes.calcite.LogicalWatermarkAssigner
+import org.apache.flink.table.util.TableTestUtil
import org.apache.flink.test.util.AbstractTestBase
import org.junit.rules.{ExpectedException, TemporaryFolder}
@@ -53,4 +57,24 @@ class StreamingTestBase extends AbstractTestBase {
this.tEnv = StreamTableEnvironment.create(env)
}
+ def addTableWithWatermark(
+ tableName: String,
+ sourceTable: Table,
+ rowtimeField: String,
+ offset: Long): Unit = {
+ val sourceRel = TableTestUtil.toRelNode(sourceTable)
+ val rowtimeFieldIdx = sourceRel.getRowType.getFieldNames.indexOf(rowtimeField)
+ if (rowtimeFieldIdx < 0) {
+ throw new TableException(s"$rowtimeField does not exist, please check it")
+ }
+ val watermarkAssigner = new LogicalWatermarkAssigner(
+ sourceRel.getCluster,
+ sourceRel.getTraitSet,
+ sourceRel,
+ Some(rowtimeFieldIdx),
+ Option(offset)
+ )
+ val queryOperation = new PlannerQueryOperation(watermarkAssigner)
+ tEnv.registerTable(tableName, tEnv.createTable(queryOperation))
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 51d16ce..7380fb7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -32,7 +32,8 @@ import org.apache.flink.table.calcite.CalciteConfig
import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
-import org.apache.flink.table.operations.RichTableSourceQueryOperation
+import org.apache.flink.table.operations.{PlannerQueryOperation, RichTableSourceQueryOperation}
+import org.apache.flink.table.plan.nodes.calcite.LogicalWatermarkAssigner
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkStreamProgram}
import org.apache.flink.table.plan.stats.FlinkStatistic
@@ -531,6 +532,35 @@ case class StreamTableTestUtil(
tableEnv.scan(name)
}
+ /**
+ * Register a table with specific row time field and offset.
+ *
+ * @param tableName table name
+ * @param sourceTable table to register
+ * @param rowtimeField row time field
+ * @param offset offset to the row time field value
+ */
+ def addTableWithWatermark(
+ tableName: String,
+ sourceTable: Table,
+ rowtimeField: String,
+ offset: Long): Unit = {
+ val sourceRel = TableTestUtil.toRelNode(sourceTable)
+ val rowtimeFieldIdx = sourceRel.getRowType.getFieldNames.indexOf(rowtimeField)
+ if (rowtimeFieldIdx < 0) {
+ throw new TableException(s"$rowtimeField does not exist, please check it")
+ }
+ val watermarkAssigner = new LogicalWatermarkAssigner(
+ sourceRel.getCluster,
+ sourceRel.getTraitSet,
+ sourceRel,
+ Some(rowtimeFieldIdx),
+ Option(offset)
+ )
+ val queryOperation = new PlannerQueryOperation(watermarkAssigner)
+ tableEnv.registerTable(tableName, tableEnv.createTable(queryOperation))
+ }
+
def verifyPlanWithTrait(): Unit = {
doVerifyPlan(
SqlExplainLevel.EXPPLAN_ATTRIBUTES,
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index 2f4b262..c966c0b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -38,6 +38,14 @@ public class TableConfigOptions {
.withDescription("Whether support values source input. The reason for disabling this " +
"feature is that checkpoint will not work properly when source finished.");
+ public static final ConfigOption<Long> SQL_EXEC_SOURCE_IDLE_TIMEOUT =
+ key("sql.exec.source.idle.timeout.ms")
+ .defaultValue(-1L)
+ .withDescription("When a source do not receive any elements for the timeout time, " +
+ "it will be marked as temporarily idle. This allows downstream " +
+ "tasks to advance their watermarks without the need to wait for " +
+ "watermarks from this source while it is idle.");
+
// ------------------------------------------------------------------------
// Sort Options
// ------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperator.java
new file mode 100644
index 0000000..8d2e5e2
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.table.dataformat.BaseRow;
+
+/**
+ * A stream operator that emits mini-batch marker in a given period.
+ * NOTE: the mini-batch marker uses watermark instead
+ */
+public class MiniBatchAssignerOperator extends AbstractStreamOperator<BaseRow>
+ implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long intervalMs;
+
+ private transient long currentWatermark;
+
+ public MiniBatchAssignerOperator(long intervalMs) {
+ this.intervalMs = intervalMs;
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ currentWatermark = 0;
+
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + intervalMs, this);
+
+ // report marker metric
+ getRuntimeContext().getMetricGroup().gauge("currentBatch", (Gauge<Long>) () -> currentWatermark);
+ }
+
+ @Override
+ public void processElement(StreamRecord<BaseRow> element) throws Exception {
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ long currentBatch = now - now % intervalMs;
+ if (currentBatch > currentWatermark) {
+ currentWatermark = currentBatch;
+ // emit
+ output.emitWatermark(new Watermark(currentBatch));
+ }
+ output.collect(element);
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ long currentBatch = now - now % intervalMs;
+ if (currentBatch > currentWatermark) {
+ currentWatermark = currentBatch;
+ // emit
+ output.emitWatermark(new Watermark(currentBatch));
+ }
+ getProcessingTimeService().registerTimer(currentBatch + intervalMs, this);
+ }
+
+ /**
+ * Override the base implementation to completely ignore watermarks propagated from
+ * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
+ * watermarks from here).
+ */
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // if we receive a Long.MAX_VALUE watermark we forward it since it is used
+ // to signal the end of input and to not block watermark progress downstream
+ if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
+ currentWatermark = Long.MAX_VALUE;
+ output.emitWatermark(mark);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperator.java
new file mode 100644
index 0000000..c45235a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A stream operator that extracts timestamps from stream elements and
+ * generates watermarks with specified emit latency.
+ */
+public class MiniBatchedWatermarkAssignerOperator
+ extends AbstractStreamOperator<BaseRow>
+ implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
+
+ private final int rowtimeFieldIndex;
+
+ private final long watermarkDelay;
+
+ // timezone offset.
+ private final long tzOffset;
+
+ private final long idleTimeout;
+
+ private long watermarkInterval;
+
+ private transient long currentWatermark;
+
+ private transient long expectedWatermark;
+
+ private transient long lastRecordTime;
+
+ private transient StreamStatusMaintainer streamStatusMaintainer;
+
+ public MiniBatchedWatermarkAssignerOperator(
+ int rowtimeFieldIndex,
+ long watermarkDelay,
+ long tzOffset,
+ long idleTimeout,
+ long watermarkInterval) {
+ this.rowtimeFieldIndex = rowtimeFieldIndex;
+ this.watermarkDelay = watermarkDelay;
+ this.tzOffset = tzOffset;
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ this.watermarkInterval = watermarkInterval;
+
+ if (idleTimeout != -1) {
+ Preconditions.checkArgument(
+ idleTimeout >= 1,
+ "The idle timeout cannot be smaller than 1 ms.");
+ }
+ this.idleTimeout = idleTimeout;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ Preconditions.checkArgument(watermarkInterval > 0,
+ "The inferred emit latency should be larger than 0");
+
+ // timezone watermarkDelay should be considered when calculating watermark start time.
+ currentWatermark = 0;
+ expectedWatermark = getMiniBatchStart(currentWatermark, tzOffset, watermarkInterval)
+ + watermarkInterval - 1;
+
+ if (idleTimeout >= 1) {
+ this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+ this.streamStatusMaintainer = getContainingTask().getStreamStatusMaintainer();
+ getProcessingTimeService().registerTimer(lastRecordTime + idleTimeout, this);
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<BaseRow> element) throws Exception {
+ if (idleTimeout != -1) {
+ // mark the channel active
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+ lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+ }
+ BaseRow row = element.getValue();
+ if (row.isNullAt(rowtimeFieldIndex)) {
+ throw new RuntimeException("RowTime field should not be null," +
+ " please convert it to a non-null long value.");
+ }
+ long wm = row.getLong(rowtimeFieldIndex) - watermarkDelay;
+ currentWatermark = Math.max(currentWatermark, wm);
+ // forward element
+ output.collect(element);
+
+ if (currentWatermark >= expectedWatermark) {
+ output.emitWatermark(new Watermark(currentWatermark));
+ long start = getMiniBatchStart(currentWatermark, tzOffset, watermarkInterval);
+ long end = start + watermarkInterval - 1;
+ expectedWatermark = end > currentWatermark ? end : end + watermarkInterval;
+ }
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ if (idleTimeout != -1) {
+ final long currentTime = getProcessingTimeService().getCurrentProcessingTime();
+ if (currentTime - lastRecordTime > idleTimeout) {
+ // mark the channel as idle to ignore watermarks from this channel
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
+ }
+ }
+
+ // register next timer
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
+ }
+
+ /**
+ * Override the base implementation to completely ignore watermarks propagated from
+ * upstream (we rely only on the {@link MiniBatchedWatermarkAssignerOperator} to emit
+ * watermarks from here).
+ */
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // if we receive a Long.MAX_VALUE watermark we forward it since it is used
+ // to signal the end of input and to not block watermark progress downstream
+ if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
+ if (idleTimeout != -1) {
+ // mark the channel active
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+ }
+ currentWatermark = Long.MAX_VALUE;
+ output.emitWatermark(mark);
+ }
+ }
+
+ public void endInput() throws Exception {
+ processWatermark(Watermark.MAX_WATERMARK);
+ }
+
+ @Override
+ public void close() throws Exception {
+ endInput(); // TODO after introduce endInput
+ super.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+ /**
+ * Method to get the mini-batch start for a watermark.
+ */
+ public static long getMiniBatchStart(long watermark, long tzOffset, long interval) {
+ return watermark - (watermark - tzOffset + interval) % interval;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperator.java
new file mode 100644
index 0000000..ca3c0c5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A stream operator that extracts timestamps from stream elements and
+ * generates periodic watermarks.
+ */
+public class WatermarkAssignerOperator
+ extends AbstractStreamOperator<BaseRow>
+ implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int rowtimeFieldIndex;
+
+ private final long watermarkDelay;
+
+ private final long idleTimeout;
+
+ private transient long watermarkInterval;
+
+ private transient long currentWatermark;
+
+ private transient long currentMaxTimestamp;
+
+ private transient long lastRecordTime;
+
+ private transient StreamStatusMaintainer streamStatusMaintainer;
+
+ /**
+ * Create a watermark assigner operator.
+ * @param rowtimeFieldIndex the field index to extract event timestamp
+ * @param watermarkDelay the delay by which watermarks are behind the maximum observed timestamp.
+ * @param idleTimeout (-1 if idleness checking is disabled)
+ */
+ public WatermarkAssignerOperator(int rowtimeFieldIndex, long watermarkDelay, long idleTimeout) {
+ this.rowtimeFieldIndex = rowtimeFieldIndex;
+ this.watermarkDelay = watermarkDelay;
+
+ if (idleTimeout != -1) {
+ Preconditions.checkArgument(
+ idleTimeout >= 1,
+ "The idle timeout cannot be smaller than 1 ms.");
+ }
+
+ this.idleTimeout = idleTimeout;
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ // watermark and timestamp should start from 0
+ this.currentWatermark = 0;
+ this.currentMaxTimestamp = 0;
+ this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
+ this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+ this.streamStatusMaintainer = getContainingTask().getStreamStatusMaintainer();
+
+ if (watermarkInterval > 0) {
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<BaseRow> element) throws Exception {
+ if (idleTimeout != -1) {
+ // mark the channel active
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+ lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+ }
+ BaseRow row = element.getValue();
+ if (row.isNullAt(rowtimeFieldIndex)) {
+ throw new RuntimeException("RowTime field should not be null," +
+ " please convert it to a non-null long value.");
+ }
+ long ts = row.getLong(rowtimeFieldIndex);
+ currentMaxTimestamp = Math.max(currentMaxTimestamp, ts);
+ // forward element
+ output.collect(element);
+
+ // eagerly emit watermark to avoid period timer not called
+ // current_ts - last_ts > interval
+ if (currentMaxTimestamp - (currentWatermark + watermarkDelay) > watermarkInterval) {
+ advanceWatermark();
+ }
+ }
+
+ private void advanceWatermark() {
+ long newWatermark = currentMaxTimestamp - watermarkDelay;
+ if (newWatermark > currentWatermark) {
+ currentWatermark = newWatermark;
+ // emit watermark
+ output.emitWatermark(new Watermark(newWatermark));
+ }
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ advanceWatermark();
+
+ if (idleTimeout != -1) {
+ final long currentTime = getProcessingTimeService().getCurrentProcessingTime();
+ if (currentTime - lastRecordTime > idleTimeout) {
+ // mark the channel as idle to ignore watermarks from this channel
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
+ }
+ }
+
+ // register next timer
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
+ }
+
+ /**
+ * Override the base implementation to completely ignore watermarks propagated from
+ * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
+ * watermarks from here).
+ */
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // if we receive a Long.MAX_VALUE watermark we forward it since it is used
+ // to signal the end of input and to not block watermark progress downstream
+ if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
+ if (idleTimeout != -1) {
+ // mark the channel active
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+ }
+ currentWatermark = Long.MAX_VALUE;
+ output.emitWatermark(mark);
+ }
+ }
+
+ public void endInput() throws Exception {
+ processWatermark(Watermark.MAX_WATERMARK);
+ }
+
+ @Override
+ public void close() throws Exception {
+ endInput(); // TODO after introduce endInput
+ super.close();
+
+ // emit a final watermark
+ advanceWatermark();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperatorTest.java
new file mode 100644
index 0000000..e653ba3
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests of {@link MiniBatchAssignerOperator}.
+ */
+public class MiniBatchAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
+
+ @Test
+ public void testMiniBatchAssignerOperator() throws Exception {
+ final MiniBatchAssignerOperator operator = new MiniBatchAssignerOperator(100);
+
+ OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ long currentTime = 0;
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+ testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+
+ {
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ long currentElement = 1L;
+ long lastWatermark = 0L;
+
+ while (true) {
+ if (output.size() > 0) {
+ Object next = output.poll();
+ assertNotNull(next);
+ Tuple2<Long, Long> update = validateElement(next, currentElement, lastWatermark);
+ long nextElementValue = update.f0;
+ lastWatermark = update.f1;
+ if (next instanceof Watermark) {
+ assertEquals(100, lastWatermark);
+ break;
+ } else {
+ assertEquals(currentElement, nextElementValue - 1);
+ currentElement += 1;
+ assertEquals(0, lastWatermark);
+ }
+ } else {
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
+ }
+
+ output.clear();
+ }
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(5L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(6L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(7L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(8L)));
+
+ {
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ long currentElement = 4L;
+ long lastWatermark = 100L;
+
+ while (true) {
+ if (output.size() > 0) {
+ Object next = output.poll();
+ assertNotNull(next);
+ Tuple2<Long, Long> update = validateElement(next, currentElement, lastWatermark);
+ long nextElementValue = update.f0;
+ lastWatermark = update.f1;
+ if (next instanceof Watermark) {
+ assertEquals(200, lastWatermark);
+ break;
+ } else {
+ assertEquals(currentElement, nextElementValue - 1);
+ currentElement += 1;
+ assertEquals(100, lastWatermark);
+ }
+ } else {
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
+ }
+
+ output.clear();
+ }
+
+ testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
+ assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
+ }
+
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperatorTest.java
new file mode 100644
index 0000000..4965fe3
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperatorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests of {@link MiniBatchedWatermarkAssignerOperator}.
+ */
+public class MiniBatchedWatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
+
+ @Test
+ public void testMiniBatchedWatermarkAssignerWithIdleSource() throws Exception {
+ // with timeout 1000 ms
+ final MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(
+ 0, 1, 0, 1000, 50);
+ OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+ testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+ // this watermark excess expected watermark, should emit a watermark of 49
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(50L)));
+
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ List<Watermark> watermarks = extractWatermarks(output);
+ assertEquals(1, watermarks.size());
+ assertEquals(new Watermark(49), watermarks.get(0));
+ assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+ output.clear();
+
+ testHarness.setProcessingTime(1001);
+ assertEquals(StreamStatus.IDLE, testHarness.getStreamStatus());
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(51L)));
+ assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+
+ // process time will not trigger to emit watermark
+ testHarness.setProcessingTime(1060);
+ output = testHarness.getOutput();
+ watermarks = extractWatermarks(output);
+ assertTrue(watermarks.isEmpty());
+ output.clear();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(100L)));
+ output = testHarness.getOutput();
+ watermarks = extractWatermarks(output);
+ assertEquals(1, watermarks.size());
+ assertEquals(new Watermark(99), watermarks.get(0));
+ }
+
+ @Test
+ public void testMiniBatchedWatermarkAssignerOperator() throws Exception {
+ final MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(0, 1, 0, -1, 50);
+
+ OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+ testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+ // this watermark excess expected watermark, should emit a watermark of 49
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(50L)));
+
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ List<Watermark> watermarks = extractWatermarks(output);
+ assertEquals(1, watermarks.size());
+ assertEquals(new Watermark(49), watermarks.get(0));
+ output.clear();
+
+ testHarness.setProcessingTime(1001);
+ output = testHarness.getOutput();
+ watermarks = extractWatermarks(output);
+ assertTrue(watermarks.isEmpty());
+ output.clear();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(99L)));
+ output = testHarness.getOutput();
+ watermarks = extractWatermarks(output);
+ assertTrue(watermarks.isEmpty());
+ output.clear();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(100L)));
+ output = testHarness.getOutput();
+ watermarks = extractWatermarks(output);
+ assertEquals(1, watermarks.size());
+ assertEquals(new Watermark(99), watermarks.get(0));
+ output.clear();
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTest.java
new file mode 100644
index 0000000..9e5347b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests of {@link WatermarkAssignerOperator}.
+ */
+public class WatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
+
+ @Test
+ public void testWatermarkAssignerWithIdleSource() throws Exception {
+ // with timeout 1000 ms
+ final WatermarkAssignerOperator operator = new WatermarkAssignerOperator(0, 1, 1000);
+ OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+ testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+
+ // trigger watermark emit
+ testHarness.setProcessingTime(51);
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ List<Watermark> watermarks = extractWatermarks(output);
+ assertEquals(1, watermarks.size());
+ assertEquals(new Watermark(3), watermarks.get(0));
+ assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+ output.clear();
+
+ testHarness.setProcessingTime(1001);
+ assertEquals(StreamStatus.IDLE, testHarness.getStreamStatus());
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(5L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(6L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(7L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(8L)));
+
+ assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+ testHarness.setProcessingTime(1060);
+ output = testHarness.getOutput();
+ watermarks = extractWatermarks(output);
+ assertEquals(1, watermarks.size());
+ assertEquals(new Watermark(7), watermarks.get(0));
+ }
+
+ @Test
+ public void testWatermarkAssignerOperator() throws Exception {
+ final WatermarkAssignerOperator operator = new WatermarkAssignerOperator(0, 1, -1);
+
+ OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+ long currentTime = 0;
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+ testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+
+ // validate first part of the sequence. we poll elements until our
+ // watermark updates to "3", which must be the result of the "4" element.
+ {
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ long nextElementValue = 1L;
+ long lastWatermark = -1L;
+
+ while (lastWatermark < 3) {
+ if (output.size() > 0) {
+ Object next = output.poll();
+ assertNotNull(next);
+ Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
+ nextElementValue = update.f0;
+ lastWatermark = update.f1;
+
+ // check the invariant
+ assertTrue(lastWatermark < nextElementValue);
+ } else {
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
+ }
+
+ output.clear();
+ }
+
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(5L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(6L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(7L)));
+ testHarness.processElement(new StreamRecord<>(GenericRow.of(8L)));
+
+ // validate the next part of the sequence. we poll elements until our
+ // watermark updates to "7", which must be the result of the "8" element.
+ {
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+ long nextElementValue = 4L;
+ long lastWatermark = 2L;
+
+ while (lastWatermark < 7) {
+ if (output.size() > 0) {
+ Object next = output.poll();
+ assertNotNull(next);
+ Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
+ nextElementValue = update.f0;
+ lastWatermark = update.f1;
+
+ // check the invariant
+ assertTrue(lastWatermark < nextElementValue);
+ } else {
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
+ }
+
+ output.clear();
+ }
+
+ testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
+ assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
+ }
+
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTestBase.java
new file mode 100644
index 0000000..f29b4af
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTestBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Base class for watermark assigner operator test.
+ */
+public class WatermarkAssignerOperatorTestBase {
+
+ protected Tuple2<Long, Long> validateElement(Object element, long nextElementValue, long currentWatermark) {
+ if (element instanceof StreamRecord) {
+ @SuppressWarnings("unchecked")
+ StreamRecord<BaseRow> record = (StreamRecord<BaseRow>) element;
+ assertEquals(nextElementValue, record.getValue().getLong(0));
+ return new Tuple2<>(nextElementValue + 1, currentWatermark);
+ }
+ else if (element instanceof Watermark) {
+ long wt = ((Watermark) element).getTimestamp();
+ assertTrue(wt > currentWatermark);
+ return new Tuple2<>(nextElementValue, wt);
+ }
+ else {
+ throw new IllegalArgumentException("unrecognized element: " + element);
+ }
+ }
+
+ protected List<Watermark> extractWatermarks(Collection<Object> collection) {
+ List<Watermark> watermarks = new ArrayList<>();
+ for (Object obj : collection) {
+ if (obj instanceof Watermark) {
+ watermarks.add((Watermark) obj);
+ }
+ }
+ return watermarks;
+ }
+}