You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/29 13:29:36 UTC
[flink] branch release-1.9 updated: [FLINK-13447][table] Change
default planner to legacy planner instead of any one
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new c763cef [FLINK-13447][table] Change default planner to legacy planner instead of any one
c763cef is described below
commit c763cef2f0d244b305fc9e3c70be0d6c89d5eba5
Author: Jark Wu <im...@gmail.com>
AuthorDate: Sat Jul 27 23:08:18 2019 +0800
[FLINK-13447][table] Change default planner to legacy planner instead of any one
This closes #9249
---
.../flink/table/api/EnvironmentSettings.java | 26 +++++++++-------
.../flink/table/api/TableEnvironmentTest.scala | 2 +-
.../planner/match/PatternTranslatorTestBase.scala | 3 +-
.../validation/SetOperatorsValidationTest.scala | 11 ++++---
.../table/validation/TableSinkValidationTest.scala | 9 +++---
.../validation/UnsupportedOpsValidationTest.scala | 20 ++++++-------
.../planner/plan/utils/FlinkRelOptUtilTest.scala | 3 +-
.../runtime/stream/sql/MatchRecognizeITCase.scala | 35 +++++++++++-----------
.../runtime/stream/sql/TemporalJoinITCase.scala | 6 ++--
.../runtime/stream/table/TableSinkITCase.scala | 29 +++++++++---------
.../runtime/utils/StreamingWithStateTestBase.scala | 4 +--
.../flink/table/planner/utils/TableTestBase.scala | 5 ++++
12 files changed, 79 insertions(+), 74 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 29c9227..8289bbc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -149,29 +149,35 @@ public class EnvironmentSettings {
* A builder for {@link EnvironmentSettings}.
*/
public static class Builder {
- private String plannerClass = null;
- private String executorClass = null;
+ private static final String OLD_PLANNER_FACTORY = "org.apache.flink.table.planner.StreamPlannerFactory";
+ private static final String OLD_EXECUTOR_FACTORY = "org.apache.flink.table.executor.StreamExecutorFactory";
+ private static final String BLINK_PLANNER_FACTORY = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
+ private static final String BLINK_EXECUTOR_FACTORY = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+
+ private String plannerClass = OLD_PLANNER_FACTORY;
+ private String executorClass = OLD_EXECUTOR_FACTORY;
private String builtInCatalogName = "default_catalog";
private String builtInDatabaseName = "default_database";
private boolean isStreamingMode = true;
/**
- * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is
- * enabled.
+ * Sets the old Flink planner as the required module.
+ *
+ * <p>This is the default behavior.
*/
public Builder useOldPlanner() {
- this.plannerClass = "org.apache.flink.table.planner.StreamPlannerFactory";
- this.executorClass = "org.apache.flink.table.executor.StreamExecutorFactory";
+ this.plannerClass = OLD_PLANNER_FACTORY;
+ this.executorClass = OLD_EXECUTOR_FACTORY;
return this;
}
/**
- * Sets the Blink planner as the required module. By default, {@link #useAnyPlanner()} is
+ * Sets the Blink planner as the required module. By default, {@link #useOldPlanner()} is
* enabled.
*/
public Builder useBlinkPlanner() {
- this.plannerClass = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
- this.executorClass = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+ this.plannerClass = BLINK_PLANNER_FACTORY;
+ this.executorClass = BLINK_EXECUTOR_FACTORY;
return this;
}
@@ -180,7 +186,7 @@ public class EnvironmentSettings {
*
* <p>A planner will be discovered automatically, if there is only one planner available.
*
- * <p>This is the default behavior.
+ * <p>By default, {@link #useOldPlanner()} is enabled.
*/
public Builder useAnyPlanner() {
this.plannerClass = null;
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 733246d..1a0996b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -39,7 +39,7 @@ class TableEnvironmentTest {
def thrown: ExpectedException = expectedException
val env = new StreamExecutionEnvironment(new LocalStreamEnvironment())
- val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
+ val tableEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
@Test
def testScanNonExistTable(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index 9301470..254e60f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -35,7 +35,6 @@ import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.table.types.logical.{IntType, RowType}
import org.apache.flink.types.Row
import org.apache.flink.util.TestLogger
-
import org.apache.calcite.rel.RelNode
import org.apache.calcite.tools.RelBuilder
import org.junit.Assert._
@@ -67,7 +66,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
when(jDataStreamMock.getId).thenReturn(0)
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
TableTestUtil.registerDataStream(
tEnv, tableName, dataStreamMock.javaStream, Some(Array[Expression]('f0, 'proctime.proctime)))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala
index da04e34..cb16186 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala
@@ -23,9 +23,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.runtime.utils.{TestData, TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
import org.apache.flink.types.Row
-
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -34,7 +33,7 @@ class SetOperatorsValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testUnionFieldsNameNotOverlap1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
@@ -51,7 +50,7 @@ class SetOperatorsValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testUnionFieldsNameNotOverlap2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
@@ -68,8 +67,8 @@ class SetOperatorsValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testUnionTablesFromDifferentEnv(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = StreamTableEnvironment.create(env)
- val tEnv2 = StreamTableEnvironment.create(env)
+ val tEnv1 = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+ val tEnv2 = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv1, 'a, 'b, 'c)
val ds2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv2, 'a, 'b, 'c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
index 418c034..16d7ff1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
@@ -24,9 +24,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.runtime.utils.{TestData, TestingAppendSink, TestingUpsertTableSink}
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
import org.apache.flink.types.Row
-
import org.junit.Test
class TableSinkValidationTest extends TableTestBase {
@@ -34,7 +33,7 @@ class TableSinkValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testAppendSinkOnUpdatingTable(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
@@ -50,7 +49,7 @@ class TableSinkValidationTest extends TableTestBase {
def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(TestData.tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -72,7 +71,7 @@ class TableSinkValidationTest extends TableTestBase {
@Test(expected = classOf[TableException])
def testAppendSinkOnLeftJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val ds1 = env.fromCollection(TestData.tupleData3).toTable(tEnv, 'a, 'b, 'c)
val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
index 38eabf8..91a7eb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -23,8 +23,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.runtime.utils.TestData
+import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.test.util.AbstractTestBase
-
import org.junit.Test
class UnsupportedOpsValidationTest extends AbstractTestBase {
@@ -32,14 +32,14 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testSort(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
env.fromCollection(TestData.smallTupleData3).toTable(tEnv).orderBy('_1.desc)
}
@Test(expected = classOf[ValidationException])
def testJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.join(t2)
@@ -48,7 +48,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testUnion(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.union(t2)
@@ -57,7 +57,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testIntersect(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.intersect(t2)
@@ -66,7 +66,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testIntersectAll(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.intersectAll(t2)
@@ -75,7 +75,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testMinus(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.minus(t2)
@@ -84,7 +84,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testMinusAll(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.minusAll(t2)
@@ -93,7 +93,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testOffset(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.offset(5)
}
@@ -101,7 +101,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testFetch(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.fetch(5)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
index 1726aa0..3b885ec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.utils
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
import org.apache.flink.table.planner.utils.TableTestUtil
@@ -33,7 +32,7 @@ class FlinkRelOptUtilTest {
@Test
def testToString(): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
- val tableEnv = StreamTableEnvironment.create(env, new TableConfig())
+ val tableEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val table = env.fromElements[(Int, Long, String)]().toTable(tableEnv, 'a, 'b, 'c)
tableEnv.registerTable("MyTable", table)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
index 68f0897..228b644 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -24,14 +24,14 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableConfig, Types}
+import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction}
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, UserDefinedFunctionTestUtils}
+import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.types.Row
-
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
@@ -48,7 +48,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testSimplePattern(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(Int, String)]
data.+=((1, "a"))
@@ -94,7 +94,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testSimplePatternWithNulls(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(Int, String, String)]
data.+=((1, "a", null))
@@ -141,11 +141,10 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testCodeSplitsAreProperlyGenerated(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tableConfig = new TableConfig
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
// TODO: this code is ported from flink-planner,
- // However code split is not supported in blink-planner.
- tableConfig.setMaxGeneratedCodeLength(1)
- val tEnv = StreamTableEnvironment.create(env, tableConfig)
+ // However code split is not supported in blink-planner yet.
+ tEnv.getConfig.setMaxGeneratedCodeLength(1)
val data = new mutable.MutableList[(Int, String, String, String)]
data.+=((1, "a", "key1", "second_key3"))
@@ -198,7 +197,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
def testEventsAreProperlyOrdered(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = Seq(
Left(2L, (12, 1, "a", 1)),
@@ -256,7 +255,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
def testMatchRecognizeAppliedToWindowedGrouping(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(String, Long, Int, Int)]
//first window
@@ -317,7 +316,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
def testWindowedGroupingAppliedToMatchRecognize(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(String, Long, Int, Int)]
//first window
@@ -371,7 +370,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testLogicalOffsets(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(String, Long, Int, Int)]
data.+=(("ACME", 1L, 19, 1))
@@ -420,7 +419,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testLogicalOffsetsWithStarVariable(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(Int, String, Long, Int)]
data.+=((1, "ACME", 1L, 20))
@@ -480,7 +479,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testLogicalOffsetOutsideOfRangeInMeasures(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(String, Long, Int, Int)]
data.+=(("ACME", 1L, 19, 1))
@@ -531,7 +530,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testAggregates(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
tEnv.getConfig.setMaxGeneratedCodeLength(1)
val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
@@ -592,7 +591,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testAggregatesWithNullInputs(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
tEnv.getConfig.setMaxGeneratedCodeLength(1)
val data = new mutable.MutableList[Row]
@@ -647,7 +646,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testAccessingCurrentTime(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val data = new mutable.MutableList[(Int, String)]
data.+=((1, "a"))
@@ -686,7 +685,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
@Test
def testUserDefinedFunctions(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
tEnv.getConfig.setMaxGeneratedCodeLength(1)
val data = new mutable.MutableList[(Int, String, Long)]
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
index 7edd4f8..3e7fcec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.types.Row
-
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class TemporalJoinITCase(state: StateBackendMode)
@Test
def testProcessTimeInnerJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
@@ -98,7 +98,7 @@ class TemporalJoinITCase(state: StateBackendMode)
@Test
def testEventTimeInnerJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index aa6b17d..0ab3480 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -26,11 +26,10 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableException, Tumble, Types}
import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
-import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil}
import org.apache.flink.table.sinks._
import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
import org.apache.flink.types.Row
-
import org.junit.Assert._
import org.junit.Test
@@ -47,7 +46,7 @@ class TableSinkITCase extends AbstractTestBase {
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
MemoryTableSourceSinkUtil.clear()
val input = env.fromCollection(tupleData3)
@@ -83,7 +82,7 @@ class TableSinkITCase extends AbstractTestBase {
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
env.setParallelism(4)
tEnv.registerTableSink(
@@ -121,7 +120,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -157,7 +156,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
val ds2 = env.fromCollection(tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -185,7 +184,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -223,7 +222,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -264,7 +263,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -306,7 +305,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -352,7 +351,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -399,7 +398,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -444,7 +443,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -489,7 +488,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
@@ -508,7 +507,7 @@ class TableSinkITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
val t = env.fromCollection(tupleData3)
.assignAscendingTimestamps(_._1.toLong)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
index 4bcd58b..7f7b65d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
@@ -30,10 +30,10 @@ import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter, BinaryString}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
+import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
import org.apache.flink.table.types.logical.RowType
-
import org.junit.runners.Parameterized
import org.junit.{After, Assert, Before}
@@ -72,7 +72,7 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
env.setStateBackend(new RocksDBStateBackend(
"file://" + baseCheckpointPath).configure(conf, classLoader))
}
- this.tEnv = StreamTableEnvironment.create(env)
+ this.tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
FailingCollectionSource.failedBefore = true
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 672a71e..7fb59e1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1005,6 +1005,11 @@ object TestingTableEnvironment {
object TableTestUtil {
+ val STREAM_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance()
+ .useBlinkPlanner().inStreamingMode().build()
+ val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance()
+ .useBlinkPlanner().inBatchMode().build()
+
/**
* Converts operation tree in the given table to a RelNode tree.
*/