You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/05 04:31:50 UTC
[flink] branch master updated: [FLINK-18769][table-planner-blink]
Fix MiniBatch doesn't work with FLIP-95 source (#13038)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94b2388 [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038)
94b2388 is described below
commit 94b23885ca34927e37334fce51b930933cfd79dd
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Aug 5 12:30:38 2020 +0800
[FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038)
This closes #13038
---
.../kafka/table/KafkaChangelogTableITCase.java | 63 ++--
.../stream/MiniBatchIntervalInferRule.scala | 5 +-
.../factories/TestValuesRuntimeFunctions.java | 48 +--
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 374 +++++++++++----------
.../stream/sql/MiniBatchIntervalInferTest.scala | 117 +++++--
5 files changed, 352 insertions(+), 255 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 7b9b069..0ac07ae 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
@@ -28,9 +29,9 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
import org.junit.Before;
import org.junit.Test;
@@ -40,12 +41,13 @@ import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.isCausedByJobFinished;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
/**
* IT cases for Kafka with changelog format for Table API & SQL.
@@ -57,6 +59,7 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
@Before
public void setup() {
+ TestValuesTableFactory.clearAllData();
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(
env,
@@ -77,6 +80,13 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
final String topic = "changelog_topic";
createTestTopic(topic, 1, 1);
+ // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769
+ Configuration tableConf = tEnv.getConfig().getConfiguration();
+ tableConf.setString("table.exec.mini-batch.enabled", "true");
+ tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
+ tableConf.setString("table.exec.mini-batch.size", "5000");
+ tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
+
// ---------- Write the Debezium json into Kafka -------------------
List<String> lines = readLines("debezium-data-schema-exclude.txt");
DataStreamSource<String> stream = env.fromCollection(lines);
@@ -116,24 +126,12 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'values'," +
- " 'sink-insert-only' = 'false'," +
- " 'sink-expected-messages-num' = '20'" +
+ " 'sink-insert-only' = 'false'" +
")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
-
- try {
- TableEnvUtil.execInsertSqlAndWaitResult(
- tEnv,
- "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
- } catch (Throwable t) {
- // we have to use a specific exception to indicate the job is finished,
- // because the registered Kafka source is infinite.
- if (!isCausedByJobFinished(t)) {
- // re-throw
- throw t;
- }
- }
+ TableResult tableResult = tEnv.executeSql(
+ "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
// Debezium captures change data on the `products` table:
//
@@ -179,15 +177,15 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
// | 110 | jacket | new water resistent white wind breaker | 0.5 |
// +-----+--------------------+---------------------------------------------------------+--------+
- String[] expected = new String[]{
+ List<String> expected = Arrays.asList(
"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800",
- "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"};
+ "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200");
- List<String> actual = TestValuesTableFactory.getResults("sink");
- assertThat(actual, containsInAnyOrder(expected));
+ waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
// ------------- cleanup -------------------
+ tableResult.getJobClient().get().cancel().get(); // stop the job
deleteTestTopic(topic);
}
@@ -201,4 +199,23 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
}
+
+ private static void waitingExpectedResults(String sinkName, List<String> expected, Duration timeout) throws InterruptedException {
+ long now = System.currentTimeMillis();
+ long stop = now + timeout.toMillis();
+ Collections.sort(expected);
+ while (System.currentTimeMillis() < stop) {
+ List<String> actual = TestValuesTableFactory.getResults(sinkName);
+ Collections.sort(actual);
+ if (expected.equals(actual)) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ // timeout, assert again
+ List<String> actual = TestValuesTableFactory.getResults(sinkName);
+ Collections.sort(actual);
+ assertEquals(expected, actual);
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
index 3971744..714b249 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
-import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecMiniBatchAssigner, StreamExecLegacyTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
+import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecLegacyTableSourceScan, StreamExecMiniBatchAssigner, StreamExecTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.hep.HepRelVertex
@@ -122,7 +122,8 @@ class MiniBatchIntervalInferRule extends RelOptRule(
.getMiniBatchInterval
.mode
node match {
- case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan =>
+ case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
+ _: StreamExecTableSourceScan =>
// append minibatch node if the mode is not NONE and reach a source leaf node
mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime
case _: StreamExecWatermarkAssigner =>
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index b2cc256..e4ede00 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -65,40 +65,42 @@ final class TestValuesRuntimeFunctions {
static List<String> getRawResults(String tableName) {
List<String> result = new ArrayList<>();
- if (globalRawResult.containsKey(tableName)) {
- globalRawResult.get(tableName)
- .values()
- .forEach(result::addAll);
- } else {
- throw new IllegalArgumentException("Can't find result for the table '" + tableName + "'.");
+ synchronized (TestValuesTableFactory.class) {
+ if (globalRawResult.containsKey(tableName)) {
+ globalRawResult.get(tableName)
+ .values()
+ .forEach(result::addAll);
+ }
}
return result;
}
static List<String> getResults(String tableName) {
List<String> result = new ArrayList<>();
- if (globalUpsertResult.containsKey(tableName)) {
- globalUpsertResult.get(tableName)
- .values()
- .forEach(map -> result.addAll(map.values()));
- } else if (globalRetractResult.containsKey(tableName)) {
- globalRetractResult.get(tableName)
- .values()
- .forEach(result::addAll);
- } else if (globalRawResult.containsKey(tableName)) {
- getRawResults(tableName).stream()
- .map(s -> s.substring(3, s.length() - 1)) // removes the +I(...) wrapper
- .forEach(result::add);
- } else {
- throw new IllegalArgumentException("Can't find result for the table '" + tableName + "'.");
+ synchronized (TestValuesTableFactory.class) {
+ if (globalUpsertResult.containsKey(tableName)) {
+ globalUpsertResult.get(tableName)
+ .values()
+ .forEach(map -> result.addAll(map.values()));
+ } else if (globalRetractResult.containsKey(tableName)) {
+ globalRetractResult.get(tableName)
+ .values()
+ .forEach(result::addAll);
+ } else if (globalRawResult.containsKey(tableName)) {
+ getRawResults(tableName).stream()
+ .map(s -> s.substring(3, s.length() - 1)) // removes the +I(...) wrapper
+ .forEach(result::add);
+ }
}
return result;
}
static void clearResults() {
- globalRawResult.clear();
- globalUpsertResult.clear();
- globalRetractResult.clear();
+ synchronized (TestValuesTableFactory.class) {
+ globalRawResult.clear();
+ globalUpsertResult.clear();
+ globalRetractResult.clear();
+ }
}
// ------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 124fad3..8456d01 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -16,6 +16,56 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testIntervalJoinWithMiniBatch">
+ <Resource name="sql">
+ <![CDATA[
+ SELECT b, COUNT(a)
+ FROM (
+ SELECT t1.a as a, t1.b as b
+ FROM
+ wmTable1 as t1 JOIN wmTable2 as t2
+ ON
+ t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+ t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(b=[$1], a=[$0])
+ +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
+ +- Calc(select=[b, a])
+ +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a, b, rowtime])
+ : +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ : +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, rowtime])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testMiniBatchOnDataStreamWithRowTime">
<Resource name="sql">
<![CDATA[
@@ -60,9 +110,30 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXP
GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[b, a, c]]], fields=[b, a, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMiniBatchOnlyForDataStream">
+ <Resource name="sql">
+ <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyDataStream1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+- Calc(select=[b, a, c])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -76,7 +147,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1
FROM (
SELECT t1.a as a, t1.b as b, t1.rowtime as rt
FROM
- LeftT as t1 JOIN RightT as t2
+ wmTable1 as t1 JOIN wmTable2 as t2
ON
t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
t2.rowtime + INTERVAL '10' SECOND
@@ -90,10 +161,12 @@ LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)])
+- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
+- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
- :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]])
]]>
</Resource>
<Resource name="planAfter">
@@ -105,12 +178,14 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b, rowtime])
- : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ : +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, rowtime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
@@ -126,7 +201,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
SELECT b,
COUNT(a) as a,
TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
- FROM LeftT
+ FROM wmTable1
GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
) as t1
JOIN
@@ -134,7 +209,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
SELECT b,
COUNT(a) as a,
HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
- FROM RightT
+ FROM wmTable2
GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
) as t2
ON
@@ -150,13 +225,15 @@ LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS
:- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)])
: +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
: +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
- : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)])
+- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]])
]]>
</Resource>
<Resource name="planAfter">
@@ -171,15 +248,17 @@ Calc(select=[b, w0$o0 AS $1])
: +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
: +- Exchange(distribution=[hash[b]])
: +- Calc(select=[b, rowtime, a])
- : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ : +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ : +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[b, a, w$rowtime AS rt])
+- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
@@ -195,7 +274,7 @@ Calc(select=[b, w0$o0 AS $1])
FROM (
SELECT b, COUNT(a) as a,
HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
- FROM RightT
+ FROM wmTable1
GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
)
GROUP BY b
@@ -215,8 +294,9 @@ LogicalProject(a=[$0], b=[$1])
+- LogicalProject(b=[$0], a=[$2])
+- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
]]>
</Resource>
<Resource name="planAfter">
@@ -228,9 +308,8 @@ Calc(select=[a, b])
: +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
: +- Exchange(distribution=[hash[a]])
: +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0])
- : +- Calc(select=[a, b])
- : +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime])
- : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ : +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime])
+ : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[a, b]]], fields=[a, b])
+- Exchange(distribution=[hash[a]])
+- GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count1$0) AS a])
+- Exchange(distribution=[hash[b]])
@@ -238,8 +317,9 @@ Calc(select=[a, b])
+- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS a])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
@@ -254,9 +334,9 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fie
+- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b])
@@ -265,9 +345,9 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fie
+- LogicalProject($f0=[$TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-'])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, b])
@@ -277,19 +357,19 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fie
+- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
== Optimized Logical Plan ==
IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods], reuse_id=[1])
:- Exchange(distribution=[hash[id1]])
-: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-: +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+- Exchange(distribution=[hash[id2]])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
Exchange(distribution=[hash[id1]], reuse_id=[2])
+- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
@@ -317,45 +397,49 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a,
== Physical Execution Plan ==
: Data Source
- content : Source: Collection Source
+ content : Source: TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+
+ : Operator
+ content : WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ ship_strategy : FORWARD
: Data Source
- content : Source: Collection Source
+ content : Source: TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
: Operator
- content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, rowtime, text])
+ content : WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL MILLISECOND)])
- ship_strategy : FORWARD
+ content : IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
+ ship_strategy : HASH
: Operator
- content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, rowtime, cnt, name, goods])
+ content : Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL MILLISECOND)])
- ship_strategy : FORWARD
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ ship_strategy : HASH
: Operator
- content : IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
- ship_strategy : HASH
+ content : Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+ ship_strategy : FORWARD
: Operator
- content : Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
- ship_strategy : FORWARD
+ content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ ship_strategy : HASH
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
- ship_strategy : HASH
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
: Operator
- content : Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+ content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3])
ship_strategy : FORWARD
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -363,49 +447,37 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a,
ship_strategy : FORWARD
: Operator
- content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3])
- ship_strategy : FORWARD
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, $f3) AS text])
+ ship_strategy : HASH
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
- ship_strategy : HASH
+ content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+ ship_strategy : FORWARD
: Operator
- content : SinkConversionToRow
- ship_strategy : FORWARD
+ content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
+ ship_strategy : HASH
: Operator
- content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, $f3) AS text])
- ship_strategy : HASH
+ content : SinkConversionToTuple2
+ ship_strategy : FORWARD
: Operator
- content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+ content : Map
ship_strategy : FORWARD
- : Operator
- content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
- ship_strategy : HASH
+ : Data Sink
+ content : Sink: TestingAppendTableSink
+ ship_strategy : FORWARD
- : Operator
- content : SinkConversionToTuple2
+ : Data Sink
+ content : Sink: TestingAppendTableSink
ship_strategy : FORWARD
- : Operator
- content : Map
+ : Data Sink
+ content : Sink: TestingRetractTableSink
ship_strategy : FORWARD
- : Data Sink
- content : Sink: TestingAppendTableSink
- ship_strategy : FORWARD
-
- : Data Sink
- content : Sink: TestingAppendTableSink
- ship_strategy : FORWARD
-
- : Data Sink
- content : Sink: TestingRetractTableSink
- ship_strategy : FORWARD
-
]]>
</Resource>
</TestCase>
@@ -442,30 +514,6 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1])
]]>
</Resource>
</TestCase>
- <TestCase name="testRedundantWatermarkDefinition">
- <Resource name="sql">
- <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
-+- LogicalProject(b=[$1], a=[$0], c=[$2])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
-+- Exchange(distribution=[hash[b]])
- +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
- +- Calc(select=[b, a, c])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testRowtimeRowsOverWithMiniBatch">
<Resource name="sql">
<![CDATA[
@@ -473,7 +521,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1
FROM (
SELECT c, COUNT(a)
OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
- FROM MyTable3
+ FROM wmTable1
)
GROUP BY cnt
]]>
@@ -482,8 +530,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1
<![CDATA[
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+- LogicalProject(cnt=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)], c=[$2])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
]]>
</Resource>
<Resource name="planAfter">
@@ -496,8 +545,35 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, rowtime])
+- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRedundantWatermarkDefinition">
+ <Resource name="sql">
+ <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+ +- Calc(select=[b, a, c])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
@@ -521,7 +597,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+- LogicalFilter(condition=[=($1, $6)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyDataStream1]])
+- LogicalTableFunctionScan(invocation=[Rates($cor0.rowtime)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(PROCTIME) proctime, TIME ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;])
]]>
</Resource>
@@ -536,58 +612,12 @@ GlobalGroupAggregate(groupBy=[r_a], select=[r_a, COUNT(count$0) AS EXPR$1])
: +- Calc(select=[a, b, rowtime])
: +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, rowtime])
+- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testIntervalJoinWithMiniBatch">
- <Resource name="sql">
- <![CDATA[
- SELECT b, COUNT(a)
- FROM (
- SELECT t1.a as a, t1.b as b
- FROM
- LeftT as t1 JOIN RightT as t2
- ON
- t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
- t2.rowtime + INTERVAL '10' SECOND
- )
- GROUP BY b
- ]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
-+- LogicalProject(b=[$1], a=[$0])
- +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
- :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
-+- Exchange(distribution=[hash[b]])
- +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
- +- Calc(select=[b, a])
- +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
- :- Exchange(distribution=[hash[a]])
- : +- Calc(select=[a, b, rowtime])
- : +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
- : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, rowtime])
- +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream2]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -600,7 +630,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
SELECT b,
COUNT(a) as cnt,
TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
- FROM MyTable3
+ FROM wmTable1
GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
)
GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
@@ -613,8 +643,9 @@ LogicalProject(b=[$0], EXPR$1=[$2])
+- LogicalProject(b=[$0], $f1=[$TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2])
+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
+- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
]]>
</Resource>
<Resource name="planAfter">
@@ -625,8 +656,9 @@ GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)],
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
@@ -639,7 +671,7 @@ GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)],
COUNT(a) as cnt,
HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
- FROM MyTable3
+ FROM wmTable1
GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
)
GROUP BY b
@@ -651,8 +683,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+- LogicalProject(b=[$0], cnt=[$2])
+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
+- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
]]>
</Resource>
<Resource name="planAfter">
@@ -663,8 +696,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(sum$0) AS EXPR$1])
+- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS cnt], emit=[early delay 500 millisecond])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime, a])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index 26fabc1..fe3d08a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -39,9 +39,37 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Before
def setup(): Unit = {
util.addDataStream[(Int, String, Long)](
- "MyTable1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+ "MyDataStream1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
util.addDataStream[(Int, String, Long)](
- "MyTable2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+ "MyDataStream2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+ // register tables using DDL
+ util.addTable(
+ s"""
+ |CREATE TABLE MyTable1 (
+ | `a` INT,
+ | `b` STRING,
+ | `c` BIGINT,
+ | proctime AS PROCTIME(),
+ | rowtime TIMESTAMP(3)
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+ util.addTable(
+ s"""
+ |CREATE TABLE wmTable1 (
+ | WATERMARK FOR rowtime AS rowtime
+ |) LIKE MyTable1 (INCLUDING ALL)
+ |""".stripMargin)
+ util.addTable(
+ s"""
+ |CREATE TABLE wmTable2 (
+ | WATERMARK FOR rowtime AS rowtime
+ |) LIKE MyTable1 (INCLUDING ALL)
+ |""".stripMargin)
+
+ // enable mini-batch
util.tableEnv.getConfig.getConfiguration.setBoolean(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
}
@@ -49,17 +77,24 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testMiniBatchOnly(): Unit = {
util.tableEnv.getConfig.getConfiguration
- .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
+ .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b"
util.verifyPlan(sql)
}
@Test
+ def testMiniBatchOnlyForDataStream(): Unit = {
+ util.tableEnv.getConfig.getConfiguration
+ .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
+ val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b"
+ util.verifyPlan(sql)
+ }
+
+ @Test
def testRedundantWatermarkDefinition(): Unit = {
util.tableEnv.getConfig.getConfiguration
.setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
- util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
- val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b"
+ val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b"
util.verifyPlan(sql)
}
@@ -69,7 +104,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
tableConfig.getConfiguration
.setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
withEarlyFireDelay(tableConfig, Time.milliseconds(500))
- util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
val sql =
"""
| SELECT b, SUM(cnt)
@@ -78,7 +112,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| COUNT(a) as cnt,
| HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
| HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
- | FROM MyTable3
+ | FROM wmTable1
| GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
| )
| GROUP BY b
@@ -90,7 +124,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
def testWindowCascade(): Unit = {
util.tableEnv.getConfig.getConfiguration
.setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "3 s")
- util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
val sql =
"""
| SELECT b,
@@ -99,7 +132,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| SELECT b,
| COUNT(a) as cnt,
| TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
- | FROM MyTable3
+ | FROM wmTable1
| GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
| )
| GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
@@ -109,8 +142,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testIntervalJoinWithMiniBatch(): Unit = {
- util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0)
- util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
@@ -120,7 +151,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| FROM (
| SELECT t1.a as a, t1.b as b
| FROM
- | LeftT as t1 JOIN RightT as t2
+ | wmTable1 as t1 JOIN wmTable2 as t2
| ON
| t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
| t2.rowtime + INTERVAL '10' SECOND
@@ -132,7 +163,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testRowtimeRowsOverWithMiniBatch(): Unit = {
- util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
@@ -142,7 +172,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| FROM (
| SELECT c, COUNT(a)
| OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
- | FROM MyTable3
+ | FROM wmTable1
| )
| GROUP BY cnt
""".stripMargin
@@ -152,8 +182,8 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testTemporalTableFunctionJoinWithMiniBatch(): Unit = {
- util.addTableWithWatermark("Orders", util.tableEnv.from("MyTable1"), "rowtime", 0)
- util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyTable2"), "rowtime", 0)
+ util.addTableWithWatermark("Orders", util.tableEnv.from("MyDataStream1"), "rowtime", 0)
+ util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyDataStream2"), "rowtime", 0)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
@@ -180,8 +210,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testMultiOperatorNeedsWatermark1(): Unit = {
// infer result: miniBatchInterval=[Rowtime, 0ms]
- util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0)
- util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
@@ -194,7 +222,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| FROM (
| SELECT t1.a as a, t1.b as b, t1.rowtime as rt
| FROM
- | LeftT as t1 JOIN RightT as t2
+ | wmTable1 as t1 JOIN wmTable2 as t2
| ON
| t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
| t2.rowtime + INTERVAL '10' SECOND
@@ -206,8 +234,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testMultiOperatorNeedsWatermark2(): Unit = {
- util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0)
- util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s")
@@ -222,7 +248,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| SELECT b,
| COUNT(a) as a,
| TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
- | FROM LeftT
+ | FROM wmTable1
| GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
| ) as t1
| JOIN
@@ -230,7 +256,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| SELECT b,
| COUNT(a) as a,
| HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
- | FROM RightT
+ | FROM wmTable2
| GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
| ) as t2
| ON
@@ -243,7 +269,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testMultiOperatorNeedsWatermark3(): Unit = {
- util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s")
@@ -258,7 +283,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
| FROM (
| SELECT b, COUNT(a) as a,
| HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
- | FROM RightT
+ | FROM wmTable1
| GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
| )
| GROUP BY b
@@ -274,12 +299,30 @@ class MiniBatchIntervalInferTest extends TableTestBase {
@Test
def testMultipleWindowAggregates(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
- util.addDataStream[(Int, Long, String)]("T1", 'id1, 'rowtime.rowtime, 'text)
- util.addDataStream[(Int, Long, Int, String, String)](
- "T2",
- 'id2, 'rowtime.rowtime, 'cnt, 'name, 'goods)
- util.addTableWithWatermark("T3", util.tableEnv.from("T1"), "rowtime", 0)
- util.addTableWithWatermark("T4", util.tableEnv.from("T2"), "rowtime", 0)
+ util.addTable(
+ s"""
+ |CREATE TABLE T1 (
+ | id1 INT,
+ | rowtime TIMESTAMP(3),
+ | `text` STRING,
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+ util.addTable(
+ s"""
+ |CREATE TABLE T2 (
+ | id2 INT,
+ | rowtime TIMESTAMP(3),
+ | cnt INT,
+ | name STRING,
+ | goods STRING,
+ | WATERMARK FOR rowtime AS rowtime
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "500 ms")
@@ -288,13 +331,13 @@ class MiniBatchIntervalInferTest extends TableTestBase {
val table1 = util.tableEnv.sqlQuery(
"""
- |SELECT id1, T3.rowtime AS ts, text
- | FROM T3, T4
+ |SELECT id1, T1.rowtime AS ts, text
+ | FROM T1, T2
|WHERE id1 = id2
- | AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
- | AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+ | AND T1.rowtime > T2.rowtime - INTERVAL '5' MINUTE
+ | AND T1.rowtime < T2.rowtime + INTERVAL '3' MINUTE
""".stripMargin)
- util.tableEnv.registerTable("TempTable1", table1)
+ util.tableEnv.createTemporaryView("TempTable1", table1)
val table2 = util.tableEnv.sqlQuery(
"""
@@ -304,7 +347,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
|FROM TempTable1
|GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
""".stripMargin)
- util.tableEnv.registerTable("TempTable2", table2)
+ util.tableEnv.createTemporaryView("TempTable2", table2)
val table3 = util.tableEnv.sqlQuery(
"""