You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/29 13:29:36 UTC

[flink] branch release-1.9 updated: [FLINK-13447][table] Change default planner to legacy planner instead of any one

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c763cef  [FLINK-13447][table] Change default planner to legacy planner instead of any one
c763cef is described below

commit c763cef2f0d244b305fc9e3c70be0d6c89d5eba5
Author: Jark Wu <im...@gmail.com>
AuthorDate: Sat Jul 27 23:08:18 2019 +0800

    [FLINK-13447][table] Change default planner to legacy planner instead of any one
    
    This closes #9249
---
 .../flink/table/api/EnvironmentSettings.java       | 26 +++++++++-------
 .../flink/table/api/TableEnvironmentTest.scala     |  2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |  3 +-
 .../validation/SetOperatorsValidationTest.scala    | 11 ++++---
 .../table/validation/TableSinkValidationTest.scala |  9 +++---
 .../validation/UnsupportedOpsValidationTest.scala  | 20 ++++++-------
 .../planner/plan/utils/FlinkRelOptUtilTest.scala   |  3 +-
 .../runtime/stream/sql/MatchRecognizeITCase.scala  | 35 +++++++++++-----------
 .../runtime/stream/sql/TemporalJoinITCase.scala    |  6 ++--
 .../runtime/stream/table/TableSinkITCase.scala     | 29 +++++++++---------
 .../runtime/utils/StreamingWithStateTestBase.scala |  4 +--
 .../flink/table/planner/utils/TableTestBase.scala  |  5 ++++
 12 files changed, 79 insertions(+), 74 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 29c9227..8289bbc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -149,29 +149,35 @@ public class EnvironmentSettings {
 	 * A builder for {@link EnvironmentSettings}.
 	 */
 	public static class Builder {
-		private String plannerClass = null;
-		private String executorClass = null;
+		private static final String OLD_PLANNER_FACTORY = "org.apache.flink.table.planner.StreamPlannerFactory";
+		private static final String OLD_EXECUTOR_FACTORY = "org.apache.flink.table.executor.StreamExecutorFactory";
+		private static final String BLINK_PLANNER_FACTORY = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
+		private static final String BLINK_EXECUTOR_FACTORY = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+
+		private String plannerClass = OLD_PLANNER_FACTORY;
+		private String executorClass = OLD_EXECUTOR_FACTORY;
 		private String builtInCatalogName = "default_catalog";
 		private String builtInDatabaseName = "default_database";
 		private boolean isStreamingMode = true;
 
 		/**
-		 * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is
-		 * enabled.
+		 * Sets the old Flink planner as the required module.
+		 *
+		 * <p>This is the default behavior.
 		 */
 		public Builder useOldPlanner() {
-			this.plannerClass = "org.apache.flink.table.planner.StreamPlannerFactory";
-			this.executorClass = "org.apache.flink.table.executor.StreamExecutorFactory";
+			this.plannerClass = OLD_PLANNER_FACTORY;
+			this.executorClass = OLD_EXECUTOR_FACTORY;
 			return this;
 		}
 
 		/**
-		 * Sets the Blink planner as the required module. By default, {@link #useAnyPlanner()} is
+		 * Sets the Blink planner as the required module. By default, {@link #useOldPlanner()} is
 		 * enabled.
 		 */
 		public Builder useBlinkPlanner() {
-			this.plannerClass = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
-			this.executorClass = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+			this.plannerClass = BLINK_PLANNER_FACTORY;
+			this.executorClass = BLINK_EXECUTOR_FACTORY;
 			return this;
 		}
 
@@ -180,7 +186,7 @@ public class EnvironmentSettings {
 		 *
 		 * <p>A planner will be discovered automatically, if there is only one planner available.
 		 *
-		 * <p>This is the default behavior.
+		 * <p>By default, {@link #useOldPlanner()} is enabled.
 		 */
 		public Builder useAnyPlanner() {
 			this.plannerClass = null;
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 733246d..1a0996b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -39,7 +39,7 @@ class TableEnvironmentTest {
   def thrown: ExpectedException = expectedException
 
   val env = new StreamExecutionEnvironment(new LocalStreamEnvironment())
-  val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
+  val tableEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
   @Test
   def testScanNonExistTable(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index 9301470..254e60f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -35,7 +35,6 @@ import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.table.types.logical.{IntType, RowType}
 import org.apache.flink.types.Row
 import org.apache.flink.util.TestLogger
-
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.tools.RelBuilder
 import org.junit.Assert._
@@ -67,7 +66,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
     when(jDataStreamMock.getId).thenReturn(0)
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     TableTestUtil.registerDataStream(
       tEnv, tableName, dataStreamMock.javaStream, Some(Array[Expression]('f0, 'proctime.proctime)))
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala
index da04e34..cb16186 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala
@@ -23,9 +23,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.runtime.utils.{TestData, TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -34,7 +33,7 @@ class SetOperatorsValidationTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testUnionFieldsNameNotOverlap1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
@@ -51,7 +50,7 @@ class SetOperatorsValidationTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testUnionFieldsNameNotOverlap2(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
@@ -68,8 +67,8 @@ class SetOperatorsValidationTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testUnionTablesFromDifferentEnv(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = StreamTableEnvironment.create(env)
-    val tEnv2 = StreamTableEnvironment.create(env)
+    val tEnv1 = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+    val tEnv2 = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv1, 'a, 'b, 'c)
     val ds2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv2, 'a, 'b, 'c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
index 418c034..16d7ff1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
@@ -24,9 +24,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.runtime.utils.{TestData, TestingAppendSink, TestingUpsertTableSink}
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
 import org.apache.flink.types.Row
-
 import org.junit.Test
 
 class TableSinkValidationTest extends TableTestBase {
@@ -34,7 +33,7 @@ class TableSinkValidationTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testAppendSinkOnUpdatingTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
 
@@ -50,7 +49,7 @@ class TableSinkValidationTest extends TableTestBase {
   def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(TestData.tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -72,7 +71,7 @@ class TableSinkValidationTest extends TableTestBase {
   @Test(expected = classOf[TableException])
   def testAppendSinkOnLeftJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val ds1 = env.fromCollection(TestData.tupleData3).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
index 38eabf8..91a7eb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -23,8 +23,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.runtime.utils.TestData
+import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.test.util.AbstractTestBase
-
 import org.junit.Test
 
 class UnsupportedOpsValidationTest extends AbstractTestBase {
@@ -32,14 +32,14 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testSort(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.fromCollection(TestData.smallTupleData3).toTable(tEnv).orderBy('_1.desc)
   }
 
   @Test(expected = classOf[ValidationException])
   def testJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.join(t2)
@@ -48,7 +48,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testUnion(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.union(t2)
@@ -57,7 +57,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testIntersect(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.intersect(t2)
@@ -66,7 +66,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testIntersectAll(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.intersectAll(t2)
@@ -75,7 +75,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testMinus(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.minus(t2)
@@ -84,7 +84,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testMinusAll(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.minusAll(t2)
@@ -93,7 +93,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testOffset(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.offset(5)
   }
@@ -101,7 +101,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
   @Test(expected = classOf[ValidationException])
   def testFetch(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
     t1.fetch(5)
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
index 1726aa0..3b885ec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
 import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
 import org.apache.flink.table.planner.utils.TableTestUtil
@@ -33,7 +32,7 @@ class FlinkRelOptUtilTest {
   @Test
   def testToString(): Unit = {
     val env  = StreamExecutionEnvironment.createLocalEnvironment()
-    val tableEnv = StreamTableEnvironment.create(env, new TableConfig())
+    val tableEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val table = env.fromElements[(Int, Long, String)]().toTable(tableEnv, 'a, 'b, 'c)
     tableEnv.registerTable("MyTable", table)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
index 68f0897..228b644 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -24,14 +24,14 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableConfig, Types}
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction}
 import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, UserDefinedFunctionTestUtils}
+import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -48,7 +48,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testSimplePattern(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(Int, String)]
     data.+=((1, "a"))
@@ -94,7 +94,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testSimplePatternWithNulls(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(Int, String, String)]
     data.+=((1, "a", null))
@@ -141,11 +141,10 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testCodeSplitsAreProperlyGenerated(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tableConfig = new TableConfig
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     // TODO: this code is ported from flink-planner,
-    //  However code split is not supported in blink-planner.
-    tableConfig.setMaxGeneratedCodeLength(1)
-    val tEnv = StreamTableEnvironment.create(env, tableConfig)
+    //  However code split is not supported in blink-planner yet.
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
     val data = new mutable.MutableList[(Int, String, String, String)]
     data.+=((1, "a", "key1", "second_key3"))
@@ -198,7 +197,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   def testEventsAreProperlyOrdered(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = Seq(
       Left(2L, (12, 1, "a", 1)),
@@ -256,7 +255,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   def testMatchRecognizeAppliedToWindowedGrouping(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(String, Long, Int, Int)]
     //first window
@@ -317,7 +316,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   def testWindowedGroupingAppliedToMatchRecognize(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(String, Long, Int, Int)]
     //first window
@@ -371,7 +370,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testLogicalOffsets(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(String, Long, Int, Int)]
     data.+=(("ACME", 1L, 19, 1))
@@ -420,7 +419,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testLogicalOffsetsWithStarVariable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(Int, String, Long, Int)]
     data.+=((1, "ACME", 1L, 20))
@@ -480,7 +479,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testLogicalOffsetOutsideOfRangeInMeasures(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(String, Long, Int, Int)]
     data.+=(("ACME", 1L, 19, 1))
@@ -531,7 +530,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testAggregates(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
     val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
@@ -592,7 +591,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testAggregatesWithNullInputs(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
     val data = new mutable.MutableList[Row]
@@ -647,7 +646,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testAccessingCurrentTime(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val data = new mutable.MutableList[(Int, String)]
     data.+=((1, "a"))
@@ -686,7 +685,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   @Test
   def testUserDefinedFunctions(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     tEnv.getConfig.setMaxGeneratedCodeLength(1)
 
     val data = new mutable.MutableList[(Int, String, Long)]
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
index 7edd4f8..3e7fcec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class TemporalJoinITCase(state: StateBackendMode)
   @Test
   def testProcessTimeInnerJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(1)
     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
@@ -98,7 +98,7 @@ class TemporalJoinITCase(state: StateBackendMode)
   @Test
   def testEventTimeInnerJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(1)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index aa6b17d..0ab3480 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -26,11 +26,10 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableException, Tumble, Types}
 import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
 import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
-import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil}
 import org.apache.flink.table.sinks._
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit.Test
 
@@ -47,7 +46,7 @@ class TableSinkITCase extends AbstractTestBase {
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     MemoryTableSourceSinkUtil.clear()
 
     val input = env.fromCollection(tupleData3)
@@ -83,7 +82,7 @@ class TableSinkITCase extends AbstractTestBase {
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(4)
 
     tEnv.registerTableSink(
@@ -121,7 +120,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
         .assignAscendingTimestamps(_._1.toLong)
@@ -157,7 +156,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = env.fromCollection(tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -185,7 +184,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -223,7 +222,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -264,7 +263,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -306,7 +305,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -352,7 +351,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -399,7 +398,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -444,7 +443,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -489,7 +488,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
@@ -508,7 +507,7 @@ class TableSinkITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = StreamTableEnvironment.create(env)
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val t = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._1.toLong)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
index 4bcd58b..7f7b65d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
@@ -30,10 +30,10 @@ import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter, BinaryString}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
+import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.types.logical.RowType
-
 import org.junit.runners.Parameterized
 import org.junit.{After, Assert, Before}
 
@@ -72,7 +72,7 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
         env.setStateBackend(new RocksDBStateBackend(
           "file://" + baseCheckpointPath).configure(conf, classLoader))
     }
-    this.tEnv = StreamTableEnvironment.create(env)
+    this.tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     FailingCollectionSource.failedBefore = true
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 672a71e..7fb59e1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1005,6 +1005,11 @@ object TestingTableEnvironment {
 
 object TableTestUtil {
 
+  val STREAM_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance()
+    .useBlinkPlanner().inStreamingMode().build()
+  val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance()
+    .useBlinkPlanner().inBatchMode().build()
+
   /**
     * Converts operation tree in the given table to a RelNode tree.
     */