You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/17 11:51:08 UTC

[flink] branch release-1.9 updated: [FLINK-13168][table] Clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 8312dfa  [FLINK-13168][table] Clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
8312dfa is described below

commit 8312dfa86d0b9670ec8b6caf0574cc0bc01c44e1
Author: godfrey he <go...@163.com>
AuthorDate: Wed Jul 17 19:49:35 2019 +0800

    [FLINK-13168][table] Clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
---
 .../java/internal/StreamTableEnvironmentImpl.java  | 10 ++--
 .../flink/table/api/EnvironmentSettings.java       | 26 +++++-----
 .../table/api/internal/TableEnvironmentImpl.java   | 24 +++++----
 .../flink/table/catalog/ConnectorCatalogTable.java |  2 +
 .../operations/TableSourceQueryOperation.java      |  1 +
 .../operations/utils/OperationTreeBuilder.java     |  8 +--
 .../utils/factories/AggregateOperationFactory.java |  8 +--
 .../utils/factories/SetOperationFactory.java       |  8 +--
 .../utils/factories/SortOperationFactory.java      |  8 +--
 .../factories/ComponentFactoryServiceTest.java     |  4 +-
 .../table/factories/utils/TestPlannerFactory.java  |  2 +-
 .../internal/StreamTableEnvironmentImpl.scala      |  2 +-
 .../flink/table/catalog/CatalogCalciteSchema.java  |  7 ++-
 .../table/catalog/CatalogManagerCalciteSchema.java |  7 ++-
 .../flink/table/catalog/DatabaseCalciteSchema.java | 34 +++++++++----
 .../flink/table/executor/BlinkExecutorFactory.java |  8 +--
 .../table/operations/DataStreamQueryOperation.java |  2 +
 .../operations/RichTableSourceQueryOperation.java  | 13 +++--
 .../flink/table/planner/BlinkPlannerFactory.java   |  8 +--
 .../PushFilterIntoTableSourceScanRule.scala        |  2 +-
 .../PushPartitionIntoTableSourceScanRule.scala     |  2 +-
 .../PushProjectIntoTableSourceScanRule.scala       |  2 +-
 .../table/plan/schema/TableSourceSinkTable.scala   | 12 -----
 .../flink/table/plan/schema/TableSourceTable.scala | 12 +++--
 .../apache/flink/table/planner/BatchPlanner.scala  |  2 +-
 .../apache/flink/table/planner/PlannerBase.scala   |  7 ++-
 .../apache/flink/table/planner/StreamPlanner.scala |  2 +-
 .../plan/nodes/resource/MockNodeTestBase.java      | 26 +++++-----
 .../parallelism/FinalParallelismSetterTest.java    |  6 +--
 .../parallelism/ShuffleStageGeneratorTest.java     |  6 +--
 .../flink/table/plan/batch/sql/TableSourceTest.xml | 16 ++++++
 .../plan/nodes/resource/ExecNodeResourceTest.xml   | 16 +++---
 .../table/plan/stream/sql/TableSourceTest.xml      | 32 ++++++++++++
 .../flink/table/catalog/CatalogTableITCase.scala   |  8 +--
 .../flink/table/codegen/agg/AggTestBase.scala      |  4 +-
 .../codegen/agg/AggsHandlerCodeGeneratorTest.scala |  2 +-
 .../table/codegen/agg/batch/BatchAggTestBase.scala |  2 +-
 .../table/plan/batch/sql/TableSourceTest.scala     | 42 ++++++++++++++--
 .../plan/nodes/resource/ExecNodeResourceTest.scala | 18 +++----
 .../table/plan/stream/sql/TableSourceTest.scala    | 37 +++++++++++++-
 .../apache/flink/table/util/TableTestBase.scala    | 57 +++++++++++-----------
 .../apache/flink/table/util/testTableSources.scala | 34 ++++++-------
 .../flink/table/catalog/CatalogCalciteSchema.java  |  8 +--
 .../table/catalog/CatalogManagerCalciteSchema.java | 10 ++--
 .../flink/table/catalog/DatabaseCalciteSchema.java |  8 +--
 .../table/executor/StreamExecutorFactory.java      |  2 +-
 .../flink/table/planner/StreamPlannerFactory.java  |  2 +-
 .../flink/table/api/internal/TableEnvImpl.scala    | 22 +++++----
 .../table/catalog/ExternalCatalogSchema.scala      | 11 +++--
 .../flink/table/catalog/ExternalTableUtil.scala    | 14 +++---
 .../logical/FlinkLogicalTableSourceScan.scala      |  2 +-
 .../rules/dataSet/BatchTableSourceScanRule.scala   |  2 +-
 .../datastream/StreamTableSourceScanRule.scala     |  2 +-
 .../flink/table/plan/schema/TableSourceTable.scala |  8 ++-
 .../apache/flink/table/planner/StreamPlanner.scala |  2 +-
 .../table/catalog/DatabaseCalciteSchemaTest.java   |  4 +-
 .../table/catalog/ExternalCatalogSchemaTest.scala  |  2 +-
 57 files changed, 386 insertions(+), 242 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 4d0586c..2e35bb9 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -87,13 +87,13 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			StreamExecutionEnvironment executionEnvironment,
 			Planner planner,
 			Executor executor,
-			boolean isStreaming) {
-		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreaming);
+			boolean isStreamingMode) {
+		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
 		this.executionEnvironment = executionEnvironment;
 
-		if (!isStreaming) {
+		if (!isStreamingMode) {
 			throw new TableException(
-				"StreamTableEnvironment is not supported on batch mode now, please use TableEnvironment.");
+				"StreamTableEnvironment is not supported in batch mode now, please use TableEnvironment.");
 		}
 	}
 
@@ -122,7 +122,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			executionEnvironment,
 			planner,
 			executor,
-			!settings.isBatchMode()
+			settings.isStreamingMode()
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 4367254..e3fc7cc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -44,7 +44,7 @@ import java.util.Map;
  */
 @PublicEvolving
 public class EnvironmentSettings {
-	public static final String BATCH_MODE = "batch-mode";
+	public static final String STREAMING_MODE = "streaming-mode";
 	public static final String CLASS_NAME = "class-name";
 
 	/**
@@ -70,22 +70,22 @@ public class EnvironmentSettings {
 	private final String builtInDatabaseName;
 
 	/**
-	 * Determines if the table environment should work in a batch ({@code true}) or
-	 * streaming ({@code false}) mode.
+	 * Determines if the table environment should work in a batch ({@code false}) or
+	 * streaming ({@code true}) mode.
 	 */
-	private final boolean isBatchMode;
+	private final boolean isStreamingMode;
 
 	private EnvironmentSettings(
 			@Nullable String plannerClass,
 			@Nullable String executorClass,
 			String builtInCatalogName,
 			String builtInDatabaseName,
-			boolean isBatchMode) {
+			boolean isStreamingMode) {
 		this.plannerClass = plannerClass;
 		this.executorClass = executorClass;
 		this.builtInCatalogName = builtInCatalogName;
 		this.builtInDatabaseName = builtInDatabaseName;
-		this.isBatchMode = isBatchMode;
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	/**
@@ -117,8 +117,8 @@ public class EnvironmentSettings {
 	/**
 	 * Tells if the {@link TableEnvironment} should work in a batch or streaming mode.
 	 */
-	public boolean isBatchMode() {
-		return isBatchMode;
+	public boolean isStreamingMode() {
+		return isStreamingMode;
 	}
 
 	@Internal
@@ -141,7 +141,7 @@ public class EnvironmentSettings {
 
 	private Map<String, String> toCommonProperties() {
 		Map<String, String> properties = new HashMap<>();
-		properties.put(BATCH_MODE, Boolean.toString(isBatchMode));
+		properties.put(STREAMING_MODE, Boolean.toString(isStreamingMode));
 		return properties;
 	}
 
@@ -153,7 +153,7 @@ public class EnvironmentSettings {
 		private String executorClass = null;
 		private String builtInCatalogName = "default_catalog";
 		private String builtInDatabaseName = "default_database";
-		private boolean isBatchMode = false;
+		private boolean isStreamingMode = true;
 
 		/**
 		 * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is
@@ -192,7 +192,7 @@ public class EnvironmentSettings {
 		 * Sets that the components should work in a batch mode. Streaming mode by default.
 		 */
 		public Builder inBatchMode() {
-			this.isBatchMode = true;
+			this.isStreamingMode = false;
 			return this;
 		}
 
@@ -200,7 +200,7 @@ public class EnvironmentSettings {
 		 * Sets that the components should work in a streaming mode. Enabled by default.
 		 */
 		public Builder inStreamingMode() {
-			this.isBatchMode = false;
+			this.isStreamingMode = true;
 			return this;
 		}
 
@@ -231,7 +231,7 @@ public class EnvironmentSettings {
 				executorClass,
 				builtInCatalogName,
 				builtInDatabaseName,
-				isBatchMode);
+				isStreamingMode);
 		}
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f2aea62..97d27f0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -80,7 +80,9 @@ import java.util.stream.Collectors;
  */
 @Internal
 public class TableEnvironmentImpl implements TableEnvironment {
-
+	// Flag that tells if the TableSource/TableSink used in this environment is stream table source/sink,
+	// and this should always be true. This avoids too many hard code.
+	private static final boolean IS_STREAM_TABLE = true;
 	private final CatalogManager catalogManager;
 
 	private final String builtinCatalogName;
@@ -99,7 +101,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			Executor executor,
 			FunctionCatalog functionCatalog,
 			Planner planner,
-			boolean isStreaming) {
+			boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
 		this.execEnv = executor;
 
@@ -117,7 +119,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 				Optional<CatalogQueryOperation> catalogTableOperation = scanInternal(path);
 				return catalogTableOperation.map(tableOperation -> new TableReferenceExpression(path, tableOperation));
 			},
-			isStreaming
+			isStreamingMode
 		);
 	}
 
@@ -144,7 +146,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			executor,
 			functionCatalog,
 			planner,
-			!settings.isBatchMode()
+			settings.isStreamingMode()
 		);
 	}
 
@@ -155,7 +157,9 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	@Override
 	public Table fromTableSource(TableSource<?> source) {
-		return createTable(new TableSourceQueryOperation<>(source, false));
+		// only accept StreamTableSource and LookupableTableSource here
+		// TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now
+		return createTable(new TableSourceQueryOperation<>(source, !IS_STREAM_TABLE));
 	}
 
 	@Override
@@ -198,6 +202,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	@Override
 	public void registerTableSource(String name, TableSource<?> tableSource) {
+		// only accept StreamTableSource and LookupableTableSource here
+		// TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now
 		registerTableSourceInternal(name, tableSource);
 	}
 
@@ -528,14 +534,14 @@ public class TableEnvironmentImpl implements TableEnvironment {
 					replaceTableInternal(
 						name,
 						ConnectorCatalogTable
-							.sourceAndSink(tableSource, sourceSinkTable.getTableSink().get(), false));
+							.sourceAndSink(tableSource, sourceSinkTable.getTableSink().get(), !IS_STREAM_TABLE));
 				}
 			} else {
 				throw new ValidationException(String.format(
 					"Table '%s' already exists. Please choose a different name.", name));
 			}
 		} else {
-			registerTableInternal(name, ConnectorCatalogTable.source(tableSource, false));
+			registerTableInternal(name, ConnectorCatalogTable.source(tableSource, !IS_STREAM_TABLE));
 		}
 	}
 
@@ -553,14 +559,14 @@ public class TableEnvironmentImpl implements TableEnvironment {
 					replaceTableInternal(
 						name,
 						ConnectorCatalogTable
-							.sourceAndSink(sourceSinkTable.getTableSource().get(), tableSink, false));
+							.sourceAndSink(sourceSinkTable.getTableSource().get(), tableSink, !IS_STREAM_TABLE));
 				}
 			} else {
 				throw new ValidationException(String.format(
 					"Table '%s' already exists. Please choose a different name.", name));
 			}
 		} else {
-			registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, false));
+			registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, !IS_STREAM_TABLE));
 		}
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 84b6e0d..a3f7bda 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -49,6 +49,8 @@ import java.util.stream.Collectors;
 public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 	private final TableSource<T1> tableSource;
 	private final TableSink<T2> tableSink;
+	// Flag that tells if the tableSource/tableSink is BatchTableSource/BatchTableSink.
+	// NOTES: this should be false in BLINK planner, because BLINK planner always uses StreamTableSource.
 	private final boolean isBatch;
 
 	public static <T1> ConnectorCatalogTable source(TableSource<T1> source, boolean isBatch) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java
index 097e92e..1644d0b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java
@@ -35,6 +35,7 @@ import java.util.Map;
 public class TableSourceQueryOperation<T> implements QueryOperation {
 
 	private final TableSource<T> tableSource;
+	// Flag that tells if the tableSource is BatchTableSource.
 	private final boolean isBatch;
 
 	public TableSourceQueryOperation(TableSource<T> tableSource, boolean isBatch) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index 7510cb9..a27aeba 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -122,15 +122,15 @@ public final class OperationTreeBuilder {
 	public static OperationTreeBuilder create(
 			FunctionLookup functionCatalog,
 			TableReferenceLookup tableReferenceLookup,
-			boolean isStreaming) {
+			boolean isStreamingMode) {
 		return new OperationTreeBuilder(
 			functionCatalog,
 			tableReferenceLookup,
 			new ProjectionOperationFactory(),
-			new SortOperationFactory(isStreaming),
+			new SortOperationFactory(isStreamingMode),
 			new CalculatedTableFactory(),
-			new SetOperationFactory(isStreaming),
-			new AggregateOperationFactory(isStreaming),
+			new SetOperationFactory(isStreamingMode),
+			new AggregateOperationFactory(isStreamingMode),
 			new JoinOperationFactory()
 		);
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
index 42c0f03..899bce5 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
@@ -89,14 +89,14 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTim
 @Internal
 public final class AggregateOperationFactory {
 
-	private final boolean isStreaming;
+	private final boolean isStreamingMode;
 	private final NoNestedAggregates noNestedAggregates = new NoNestedAggregates();
 	private final ValidateDistinct validateDistinct = new ValidateDistinct();
 	private final AggregationExpressionValidator aggregationsValidator = new AggregationExpressionValidator();
 	private final IsKeyTypeChecker isKeyTypeChecker = new IsKeyTypeChecker();
 
-	public AggregateOperationFactory(boolean isStreaming) {
-		this.isStreaming = isStreaming;
+	public AggregateOperationFactory(boolean isStreamingMode) {
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	/**
@@ -274,7 +274,7 @@ public final class AggregateOperationFactory {
 	}
 
 	private void validateTimeAttributeType(LogicalType timeFieldType) {
-		if (isStreaming) {
+		if (isStreamingMode) {
 			validateStreamTimeAttribute(timeFieldType);
 		} else {
 			validateBatchTimeAttribute(timeFieldType);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
index 9b54b78..0d206f9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
@@ -37,10 +37,10 @@ import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperat
 @Internal
 public class SetOperationFactory {
 
-	private final boolean isStreaming;
+	private final boolean isStreamingMode;
 
-	public SetOperationFactory(boolean isStreaming) {
-		this.isStreaming = isStreaming;
+	public SetOperationFactory(boolean isStreamingMode) {
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	/**
@@ -98,7 +98,7 @@ public class SetOperationFactory {
 	private void failIfStreaming(SetQueryOperationType type, boolean all) {
 		boolean shouldFailInCaseOfStreaming = !all || type != UNION;
 
-		if (isStreaming && shouldFailInCaseOfStreaming) {
+		if (isStreamingMode && shouldFailInCaseOfStreaming) {
 			throw new ValidationException(
 				format(
 					"The %s operation on two unbounded tables is currently not supported.",
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
index 765fe4f..a58db14 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
@@ -40,10 +40,10 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ORDER_
 @Internal
 public class SortOperationFactory {
 
-	private final boolean isStreaming;
+	private final boolean isStreamingMode;
 
-	public SortOperationFactory(boolean isStreaming) {
-		this.isStreaming = isStreaming;
+	public SortOperationFactory(boolean isStreamingMode) {
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	/**
@@ -127,7 +127,7 @@ public class SortOperationFactory {
 	}
 
 	private void failIfStreaming() {
-		if (isStreaming) {
+		if (isStreamingMode) {
 			throw new ValidationException("A limit operation on unbounded tables is currently not supported.");
 		}
 	}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
index 13e821d..e6befb5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
@@ -45,7 +45,7 @@ public class ComponentFactoryServiceTest {
 	public void testLookingUpAmbiguousPlanners() {
 		Map<String, String> properties = new HashMap<>();
 		properties.put(EnvironmentSettings.CLASS_NAME, TestPlannerFactory.class.getCanonicalName());
-		properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+		properties.put(EnvironmentSettings.STREAMING_MODE, Boolean.toString(false));
 		properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
 
 		PlannerFactory plannerFactory = ComponentFactoryService.find(PlannerFactory.class, properties);
@@ -60,7 +60,7 @@ public class ComponentFactoryServiceTest {
 
 		Map<String, String> properties = new HashMap<>();
 		properties.put(EnvironmentSettings.CLASS_NAME, "NoSuchClass");
-		properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+		properties.put(EnvironmentSettings.STREAMING_MODE, Boolean.toString(false));
 		properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
 
 		ComponentFactoryService.find(PlannerFactory.class, properties);
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
index 1a0f05f..6ace0df 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
@@ -64,6 +64,6 @@ public class TestPlannerFactory implements PlannerFactory {
 
 	@Override
 	public List<String> supportedProperties() {
-		return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.BATCH_MODE);
+		return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.STREAMING_MODE);
 	}
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 1304c8c..67021cb 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -300,7 +300,7 @@ object StreamTableEnvironmentImpl {
       executionEnvironment,
       planner,
       executor,
-      !settings.isBatchMode
+      settings.isStreamingMode
     )
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index e816c8f..678eca3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -39,10 +39,13 @@ public class CatalogCalciteSchema extends FlinkSchema {
 
 	private final String catalogName;
 	private final Catalog catalog;
+	// Flag that tells if the current planner should work in a batch or streaming mode.
+	private final boolean isStreamingMode;
 
-	public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+	public CatalogCalciteSchema(String catalogName, Catalog catalog, boolean isStreamingMode) {
 		this.catalogName = catalogName;
 		this.catalog = catalog;
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	/**
@@ -54,7 +57,7 @@ public class CatalogCalciteSchema extends FlinkSchema {
 	@Override
 	public Schema getSubSchema(String schemaName) {
 		if (catalog.databaseExists(schemaName)) {
-			return new DatabaseCalciteSchema(schemaName, catalogName, catalog);
+			return new DatabaseCalciteSchema(schemaName, catalogName, catalog, isStreamingMode);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index c7238eb..7ca0a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -43,9 +43,12 @@ import java.util.Set;
 public class CatalogManagerCalciteSchema extends FlinkSchema {
 
 	private final CatalogManager catalogManager;
+	// Flag that tells if the current planner should work in a batch or streaming mode.
+	private final boolean isStreamingMode;
 
-	public CatalogManagerCalciteSchema(CatalogManager catalogManager) {
+	public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	@Override
@@ -61,7 +64,7 @@ public class CatalogManagerCalciteSchema extends FlinkSchema {
 	@Override
 	public Schema getSubSchema(String name) {
 		Schema schema = catalogManager.getCatalog(name)
-			.map(catalog -> new CatalogCalciteSchema(name, catalog))
+			.map(catalog -> new CatalogCalciteSchema(name, catalog, isStreamingMode))
 			.orElse(null);
 
 		if (schema == null && catalogManager.getExternalCatalog(name).isPresent()) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 0ed45ba..cc90916 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.RichTableSourceQueryOperation;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
+import org.apache.flink.table.sources.LookupableTableSource;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 
@@ -55,11 +56,14 @@ class DatabaseCalciteSchema extends FlinkSchema {
 	private final String databaseName;
 	private final String catalogName;
 	private final Catalog catalog;
+	// Flag that tells if the current planner should work in a batch or streaming mode.
+	private final boolean isStreamingMode;
 
-	public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog) {
+	public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog, boolean isStreamingMode) {
 		this.databaseName = databaseName;
 		this.catalogName = catalogName;
 		this.catalog = catalog;
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	@Override
@@ -96,20 +100,30 @@ class DatabaseCalciteSchema extends FlinkSchema {
 			// TableNotExistException should never happen, because we are checking it exists
 			// via catalog.tableExists
 			throw new TableException(format(
-				"A failure occurred when accessing table. Table path [%s, %s, %s]",
-				catalogName,
-				databaseName,
-				tableName), e);
+					"A failure occurred when accessing table. Table path [%s, %s, %s]",
+					catalogName,
+					databaseName,
+					tableName), e);
 		}
 	}
 
 	private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
 		return table.getTableSource()
-			.map(tableSource -> new TableSourceTable<>(
-				tableSource,
-				!table.isBatch(),
-				FlinkStatistic.UNKNOWN()))
-			.orElseThrow(() -> new TableException("Cannot query a sink only table."));
+				.map(tableSource -> {
+					if (!(tableSource instanceof StreamTableSource ||
+							tableSource instanceof LookupableTableSource)) {
+						throw new TableException(
+								"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
+					}
+					if (!isStreamingMode && tableSource instanceof StreamTableSource &&
+							!((StreamTableSource<?>) tableSource).isBounded()) {
+						throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
+					}
+					return new TableSourceTable<>(
+							tableSource,
+							isStreamingMode,
+							FlinkStatistic.UNKNOWN());
+				}).orElseThrow(() -> new TableException("Cannot query a sink only table."));
 	}
 
 	private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java
index ac16cc8..855aa4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java
@@ -49,10 +49,10 @@ public class BlinkExecutorFactory implements ExecutorFactory {
 	 * @return instance of a {@link Executor}
 	 */
 	public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment) {
-		if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.BATCH_MODE, "false"))) {
-			return new BatchExecutor(executionEnvironment);
-		} else {
+		if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.STREAMING_MODE, "true"))) {
 			return new StreamExecutor(executionEnvironment);
+		} else {
+			return new BatchExecutor(executionEnvironment);
 		}
 	}
 
@@ -69,7 +69,7 @@ public class BlinkExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public List<String> supportedProperties() {
-		return Arrays.asList(EnvironmentSettings.BATCH_MODE, EnvironmentSettings.CLASS_NAME);
+		return Arrays.asList(EnvironmentSettings.STREAMING_MODE, EnvironmentSettings.CLASS_NAME);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
index 7b1d7d2..01c0400 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
@@ -31,6 +31,8 @@ import java.util.Map;
 /**
  * Describes a relational operation that reads from a {@link DataStream}.
  *
+ * <p>This is only used for testing.
+ *
  * <p>This operation may expose only part, or change the order of the fields available in a
  * {@link org.apache.flink.api.common.typeutils.CompositeType} of the underlying {@link DataStream}.
  * The {@link DataStreamQueryOperation#getFieldIndices()} describes the mapping between fields of the
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
index 62e2a42..d572d73 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
@@ -20,7 +20,9 @@ package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
+import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +32,8 @@ import java.util.Map;
  * A {@link TableSourceQueryOperation} with {@link FlinkStatistic} and qualifiedName.
  * TODO this class should be deleted after unique key in TableSchema is ready
  * and setting catalog statistic to TableSourceTable in DatabaseCalciteSchema is ready
+ *
+ * <p>This is only used for testing.
  */
 @Internal
 public class RichTableSourceQueryOperation<T> extends TableSourceQueryOperation<T> {
@@ -37,10 +41,11 @@ public class RichTableSourceQueryOperation<T> extends TableSourceQueryOperation<
 	private List<String> qualifiedName;
 
 	public RichTableSourceQueryOperation(
-		TableSource<T> tableSource,
-		boolean isBatch,
-		FlinkStatistic statistic) {
-		super(tableSource, isBatch);
+			TableSource<T> tableSource,
+			FlinkStatistic statistic) {
+		super(tableSource, false);
+		Preconditions.checkArgument(tableSource instanceof StreamTableSource,
+				"Blink planner should always use StreamTableSource.");
 		this.statistic = statistic;
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java
index 8b967d7..2d353df 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java
@@ -46,10 +46,10 @@ public final class BlinkPlannerFactory implements PlannerFactory {
 		TableConfig tableConfig,
 		FunctionCatalog functionCatalog,
 		CatalogManager catalogManager) {
-		if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.BATCH_MODE, "false"))) {
-			return new BatchPlanner(executor, tableConfig, functionCatalog, catalogManager);
-		} else {
+		if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.STREAMING_MODE, "true"))) {
 			return new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager);
+		} else {
+			return new BatchPlanner(executor, tableConfig, functionCatalog, catalogManager);
 		}
 	}
 
@@ -67,6 +67,6 @@ public final class BlinkPlannerFactory implements PlannerFactory {
 
 	@Override
 	public List<String> supportedProperties() {
-		return Arrays.asList(EnvironmentSettings.BATCH_MODE, EnvironmentSettings.CLASS_NAME);
+		return Arrays.asList(EnvironmentSettings.STREAMING_MODE, EnvironmentSettings.CLASS_NAME);
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 90d35d6..14f173d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -139,7 +139,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
       FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
     }
     val newTableSourceTable = new TableSourceTable(
-      newTableSource, tableSourceTable.isStreaming, newStatistic)
+      newTableSource, tableSourceTable.isStreamingMode, newStatistic)
     relOptTable.copy(newTableSourceTable, tableSourceTable.getRowType(typeFactory))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
index cd7fd7b..9c9c400 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
@@ -125,7 +125,7 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(
       FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
     }
     val newTableSourceTable = new TableSourceTable(
-      newTableSource, tableSourceTable.isStreaming, newStatistic)
+      newTableSource, tableSourceTable.isStreamingMode, newStatistic)
    val newRelOptTable = relOptTable.copy(newTableSourceTable, relOptTable.getRowType)
 
     val newScan = new LogicalTableScan(scan.getCluster, scan.getTraitSet, newRelOptTable)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 6176872..7518844 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -76,7 +76,7 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
     }
     // project push down does not change the statistic, we can reuse origin statistic
     val newTableSourceTable = new TableSourceTable(
-      newTableSource, tableSourceTable.isStreaming, tableSourceTable.statistic)
+      newTableSource, tableSourceTable.isStreamingMode, tableSourceTable.statistic)
     // row type is changed after project push down
     val newRowType = newTableSourceTable.getRowType(scan.getCluster.getTypeFactory)
     val newRelOptTable = relOptTable.copy(newTableSourceTable, newRowType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
index c421d78..90cb825 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
@@ -53,18 +53,6 @@ class TableSourceSinkTable[T1, T2](
       .getOrElse(throw new TableException("Unable to get statistics of table source sink table."))
   }
 
-  def isSourceTable: Boolean = tableSourceTable.isDefined
-
-  def isStreamSourceTable: Boolean = tableSourceTable match {
-    case Some(tst) => tst.isStreaming
-    case _ => false
-  }
-
-  def isBatchSourceTable: Boolean = tableSourceTable match {
-    case Some(tst) => !tst.isStreaming
-    case _ => false
-  }
-
   override def copy(statistic: FlinkStatistic): FlinkTable = {
     new TableSourceSinkTable[T1, T2](
       tableSourceTable.map(source => source.copy(statistic)),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index ce4e722..16da7c3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -26,10 +26,14 @@ import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 /**
   * Abstract class which define the interfaces required to convert a [[TableSource]] to
   * a Calcite Table
+  *
+  * @param tableSource The [[TableSource]] for which is converted to a Calcite Table.
+  * @param isStreamingMode A flag that tells if the current table is in stream mode.
+  * @param statistic The table statistics.
   */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
-    val isStreaming: Boolean,
+    val isStreamingMode: Boolean,
     val statistic: FlinkStatistic)
   extends FlinkTable {
 
@@ -40,7 +44,7 @@ class TableSourceTable[T](
     TableSourceUtil.getRelDataType(
       tableSource,
       None,
-      streaming = isStreaming,
+      streaming = isStreamingMode,
       typeFactory.asInstanceOf[FlinkTypeFactory])
   }
 
@@ -51,7 +55,7 @@ class TableSourceTable[T](
     * @return Copy of this table, substituting statistic.
     */
   override def copy(statistic: FlinkStatistic): TableSourceTable[T] = {
-    new TableSourceTable(tableSource, isStreaming, statistic)
+    new TableSourceTable(tableSource, isStreamingMode, statistic)
   }
 
   /**
@@ -66,6 +70,6 @@ class TableSourceTable[T](
     * @return new TableSourceTable
     */
   def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T] = {
-    new TableSourceTable[T](tableSource, isStreaming, statistic)
+    new TableSourceTable[T](tableSource, isStreamingMode, statistic)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
index 9b4e4f2..b2dedbd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
@@ -46,7 +46,7 @@ class BatchPlanner(
     config: TableConfig,
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager)
-  extends PlannerBase(executor, config, functionCatalog, catalogManager) {
+  extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = false) {
 
   override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
     Array(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index 021a650..ddb759d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -62,12 +62,15 @@ import _root_.scala.collection.JavaConversions._
   * @param config          mutable configuration passed from corresponding [[TableEnvironment]]
   * @param functionCatalog catalog of functions
   * @param catalogManager  manager of catalog meta objects such as tables, views, databases etc.
+  * @param isStreamingMode Determines if the planner should work in a batch (false}) or
+  *                        streaming (true) mode.
   */
 abstract class PlannerBase(
     executor: Executor,
     config: TableConfig,
     val functionCatalog: FunctionCatalog,
-    catalogManager: CatalogManager)
+    catalogManager: CatalogManager,
+    isStreamingMode: Boolean)
   extends Planner {
 
   // temporary utility until we don't use planner expressions anymore
@@ -79,7 +82,7 @@ abstract class PlannerBase(
     new PlannerContext(
       config,
       functionCatalog,
-      asRootSchema(new CatalogManagerCalciteSchema(catalogManager)),
+      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
       getTraitDefs.toList
     )
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 3007d92..b64416d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -45,7 +45,7 @@ class StreamPlanner(
     config: TableConfig,
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager)
-  extends PlannerBase(executor, config, functionCatalog, catalogManager) {
+  extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
 
   override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
     Array(
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
index 8637ed6..4bfbc12 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
@@ -53,10 +53,10 @@ import static org.mockito.Mockito.when;
 public class MockNodeTestBase {
 
 	protected List<ExecNode> nodeList;
-	private final boolean isBatch;
+	private final boolean isBatchMode;
 
-	public MockNodeTestBase(boolean isBatch) {
-		this.isBatch = isBatch;
+	public MockNodeTestBase(boolean isBatchMode) {
+		this.isBatchMode = isBatchMode;
 	}
 
 	private void updateNode(int index, ExecNode<?, ?> node) {
@@ -90,25 +90,25 @@ public class MockNodeTestBase {
 	}
 
 	protected ExecNode<?, ?> updateCalc(int index) {
-		ExecNode<?, ?> node = isBatch ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
+		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
 		updateNode(index, node);
 		return node;
 	}
 
 	protected ExecNode<?, ?> updateValues(int index) {
-		ExecNode<?, ?> node = isBatch ? mock(BatchExecValues.class) : mock(StreamExecValues.class);
+		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecValues.class) : mock(StreamExecValues.class);
 		updateNode(index, node);
 		return node;
 	}
 
 	protected ExecNode<?, ?> updateUnion(int index) {
-		ExecNode<?, ?> node = isBatch ? mock(BatchExecUnion.class) : mock(StreamExecUnion.class);
+		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecUnion.class) : mock(StreamExecUnion.class);
 		updateNode(index, node);
 		return node;
 	}
 
 	protected ExecNode<?, ?> updateExchange(int index) {
-		ExecNode<?, ?> node = isBatch ? mock(BatchExecExchange.class, RETURNS_DEEP_STUBS) :
+		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecExchange.class, RETURNS_DEEP_STUBS) :
 				mock(StreamExecExchange.class, RETURNS_DEEP_STUBS);
 		updateNode(index, node);
 		return node;
@@ -116,7 +116,7 @@ public class MockNodeTestBase {
 
 	protected ExecNode<?, ?> updateExchange(int index, RelDistribution.Type type) {
 		ExecNode<?, ?> node = updateExchange(index);
-		if (isBatch) {
+		if (isBatchMode) {
 			when(((BatchExecExchange) node).getDistribution().getType()).thenReturn(type);
 		} else {
 			when(((StreamExecExchange) node).getDistribution().getType()).thenReturn(type);
@@ -125,14 +125,14 @@ public class MockNodeTestBase {
 	}
 
 	protected ExecNode<?, ?> updateTableSource(int index) {
-		ExecNode<?, ?> node = isBatch ? mock(BatchExecTableSourceScan.class) : mock(StreamExecTableSourceScan.class);
+		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecTableSourceScan.class) : mock(StreamExecTableSourceScan.class);
 		updateNode(index, node);
 		return node;
 	}
 
 	protected ExecNode<?, ?> updateTableSource(int index, int maxParallelism) {
 		ExecNode<?, ?> node = updateTableSource(index);
-		if (isBatch) {
+		if (isBatchMode) {
 			when(((BatchExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
 		} else {
 			when(((StreamExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
@@ -141,14 +141,14 @@ public class MockNodeTestBase {
 	}
 
 	protected ExecNode<?, ?> updateStreamScan(int index) {
-		ExecNode<?, ?> node = isBatch ? mock(BatchExecBoundedStreamScan.class) : mock(StreamExecDataStreamScan.class);
+		ExecNode<?, ?> node = isBatchMode ? mock(BatchExecBoundedStreamScan.class) : mock(StreamExecDataStreamScan.class);
 		updateNode(index, node);
 		return node;
 	}
 
 	protected ExecNode<?, ?> updateStreamScan(int index, int parallelism) {
 		ExecNode<?, ?> node = updateStreamScan(index);
-		if (isBatch) {
+		if (isBatchMode) {
 			when(((BatchExecBoundedStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
 		} else {
 			when(((StreamExecDataStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
@@ -159,7 +159,7 @@ public class MockNodeTestBase {
 	protected void createNodeList(int num) {
 		nodeList = new LinkedList<>();
 		for (int i = 0; i < num; i++) {
-			ExecNode<?, ?>  node = isBatch ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
+			ExecNode<?, ?>  node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
 			when(node.getInputNodes()).thenReturn(new ArrayList<>());
 			when(node.getResource()).thenReturn(new NodeResource());
 			when(node.toString()).thenReturn("id: " + i);
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
index 1254a5c..e9f2234 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
@@ -44,8 +44,8 @@ public class FinalParallelismSetterTest extends MockNodeTestBase {
 
 	private StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 
-	public FinalParallelismSetterTest(boolean isBatch) {
-		super(isBatch);
+	public FinalParallelismSetterTest(boolean isBatchMode) {
+		super(isBatchMode);
 	}
 
 	@Before
@@ -108,7 +108,7 @@ public class FinalParallelismSetterTest extends MockNodeTestBase {
 		assertEquals(1, finalParallelismNodeMap.get(nodeList.get(7)).intValue());
 	}
 
-	@Parameterized.Parameters(name = "isBatch = {0}")
+	@Parameterized.Parameters(name = "isBatchMode = {0}")
 	public static Collection<Object[]> runMode() {
 		return Arrays.asList(
 				new Object[] { false, },
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
index 17c7ad1..e0697bc 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
@@ -45,8 +45,8 @@ public class ShuffleStageGeneratorTest extends MockNodeTestBase {
 
 	private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap;
 
-	public ShuffleStageGeneratorTest(boolean isBatch) {
-		super(isBatch);
+	public ShuffleStageGeneratorTest(boolean isBatchMode) {
+		super(isBatchMode);
 	}
 
 	@Before
@@ -313,7 +313,7 @@ public class ShuffleStageGeneratorTest extends MockNodeTestBase {
 		}
 	}
 
-	@Parameterized.Parameters(name = "isBatch = {0}")
+	@Parameterized.Parameters(name = "isBatchMode = {0}")
 	public static Collection<Object[]> runMode() {
 		return Arrays.asList(
 				new Object[] { false, },
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
index 4d9c790..190f1c6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
@@ -16,6 +16,22 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testBoundedStreamTableSource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testFilterCannotPushDown">
     <Resource name="sql">
       <![CDATA[SELECT * FROM FilterableTable WHERE price > 10]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml
index 8c0a337..c692020 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml
@@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testConfigSourceParallelism[isBatch=false]">
+  <TestCase name="testConfigSourceParallelism[isBatchMode=false]">
     <Resource name="sql">
       <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
     </Resource>
@@ -32,7 +32,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testConfigSourceParallelism[isBatch=true]">
+  <TestCase name="testConfigSourceParallelism[isBatchMode=true]">
     <Resource name="sql">
       <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
     </Resource>
@@ -50,7 +50,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSortLimit[isBatch=false]">
+  <TestCase name="testSortLimit[isBatchMode=false]">
     <Resource name="sql">
       <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
     </Resource>
@@ -66,7 +66,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSourcePartitionMaxNum[isBatch=true]">
+  <TestCase name="testSourcePartitionMaxNum[isBatchMode=true]">
     <Resource name="sql">
       <![CDATA[SELECT * FROM table3]]>
     </Resource>
@@ -76,7 +76,7 @@ TableSourceScan(table=[[default_catalog, default_database, table3, source: [Mock
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSortLimit[isBatch=true]">
+  <TestCase name="testSortLimit[isBatchMode=true]">
     <Resource name="sql">
       <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
     </Resource>
@@ -94,7 +94,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testUnionQuery[isBatch=true]">
+  <TestCase name="testUnionQuery[isBatchMode=true]">
     <Resource name="sql">
       <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
     </Resource>
@@ -118,7 +118,7 @@ Calc(select=[sum_a, g], resource=[{parallelism=18}])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSourcePartitionMaxNum[isBatch=false]">
+  <TestCase name="testSourcePartitionMaxNum[isBatchMode=false]">
     <Resource name="sql">
       <![CDATA[SELECT * FROM table3]]>
     </Resource>
@@ -128,7 +128,7 @@ TableSourceScan(table=[[default_catalog, default_database, table3, source: [Mock
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testUnionQuery[isBatch=false]">
+  <TestCase name="testUnionQuery[isBatchMode=false]">
     <Resource name="sql">
       <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
index 8fa7a3f..df5adca 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
@@ -16,6 +16,22 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testBoundedStreamTableSource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testFilterCannotPushDown">
     <Resource name="sql">
       <![CDATA[SELECT * FROM FilterableTable WHERE price > 10]]>
@@ -394,4 +410,20 @@ Calc(select=[rowtime, id, name, val])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testUnboundedStreamTableSource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index e06ccb2..b33ebfb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -34,9 +34,9 @@ import scala.collection.JavaConversions._
 
 /** Test cases for catalog table. */
 @RunWith(classOf[Parameterized])
-class CatalogTableITCase(isStreaming: Boolean) {
+class CatalogTableITCase(isStreamingMode: Boolean) {
 
-  private val settings = if (isStreaming) {
+  private val settings = if (isStreamingMode) {
     EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
   } else {
     EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
@@ -68,7 +68,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
       .getConfiguration
       .setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
     TestCollectionTableFactory.reset()
-    TestCollectionTableFactory.isStreaming = isStreaming
+    TestCollectionTableFactory.isStreaming = isStreamingMode
   }
 
   def toRow(args: Any*):Row = {
@@ -216,7 +216,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
 
   @Test
   def testInsertWithAggregateSource(): Unit = {
-    if (isStreaming) {
+    if (isStreamingMode) {
       return
     }
     val sourceData = List(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
index 0237a62..eac535b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
@@ -41,11 +41,11 @@ import org.powermock.api.mockito.PowerMockito.{mock, when}
 /**
   * Agg test base to mock agg information and etc.
   */
-abstract class AggTestBase(isBatch: Boolean) {
+abstract class AggTestBase(isBatchMode: Boolean) {
 
   val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
   val env = new ScalaStreamExecEnv(new LocalStreamEnvironment)
-  private val tEnv = if (isBatch) {
+  private val tEnv = if (isBatchMode) {
     val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
     // use impl class instead of interface class to avoid
     // "Static methods in interface require -target:jvm-1.8"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
index 2f2ef16..4561147 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
@@ -28,7 +28,7 @@ import org.junit.{Assert, Test}
 
 import java.lang
 
-class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatch = false) {
+class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatchMode = false) {
 
   @Test
   def testAvg(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala
index edcab6b..e351d95 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
 /**
   * Base agg test.
   */
-abstract class BatchAggTestBase extends AggTestBase(isBatch = true) {
+abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) {
 
   val globalOutputType = RowType.of(
     Array[LogicalType](
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
index 78fd39d..41b2f68 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
@@ -20,8 +20,11 @@ package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types, ValidationException}
 import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.types.{DataType, TypeInfoDataTypeConverter}
+import org.apache.flink.table.util._
 import org.apache.flink.table.types.TypeInfoDataTypeConverter
 import org.apache.flink.table.util.{TestPartitionableTableSource, _}
 import org.apache.flink.types.Row
@@ -31,12 +34,12 @@ import org.junit.{Before, Test}
 class TableSourceTest extends TableTestBase {
 
   private val util = batchTestUtil()
+  private val tableSchema = TableSchema.builder().fields(
+    Array("a", "b", "c"),
+    Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
 
   @Before
   def setup(): Unit = {
-    val tableSchema = TableSchema.builder().fields(
-      Array("a", "b", "c"),
-      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
     util.tableEnv.registerTableSource("ProjectableTable", new TestProjectableTableSource(
       true,
       tableSchema,
@@ -50,6 +53,35 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
+  def testBoundedStreamTableSource(): Unit = {
+    util.tableEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema))
+    util.verifyPlan("SELECT * FROM MyTable")
+  }
+
+  @Test
+  def testUnboundedStreamTableSource(): Unit = {
+    util.tableEnv.registerTableSource("MyTable", new TestTableSource(false, tableSchema))
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("Only bounded StreamTableSource can be used in batch mode.")
+    util.verifyPlan("SELECT * FROM MyTable")
+  }
+
+  @Test
+  def testNonStreamTableSource(): Unit = {
+    val tableSource = new TableSource[Row]() {
+
+      override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+      override def getTableSchema: TableSchema = tableSchema
+    }
+    util.tableEnv.registerTableSource("MyTable", tableSource)
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Only StreamTableSource and LookupableTableSource can be used in Blink planner.")
+    util.verifyPlan("SELECT * FROM MyTable")
+  }
+
+  @Test
   def testSimpleProject(): Unit = {
     util.verifyPlan("SELECT a, c FROM ProjectableTable")
   }
@@ -177,7 +209,7 @@ class TableSourceTest extends TableTestBase {
     row.setField(3, DateTimeTestUtil.localDateTime("2017-01-24 12:45:01.234"))
 
     val tableSource = TestFilterableTableSource(
-      isBatch = true, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
+      isBounded = true, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
     util.tableEnv.registerTableSource("FilterableTable1", tableSource)
 
     val sqlQuery =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
index 6aff288..5b1d050 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
@@ -42,21 +42,21 @@ import org.mockito.Mockito.{mock, when}
 import java.util
 
 @RunWith(classOf[Parameterized])
-class ExecNodeResourceTest(isBatch: Boolean) extends TableTestBase {
+class ExecNodeResourceTest(isBatchMode: Boolean) extends TableTestBase {
 
   private var testUtil: TableTestUtil = _
 
   @Before
   def before(): Unit = {
-    testUtil = if(isBatch) batchTestUtil() else streamTestUtil()
+    testUtil = if(isBatchMode) batchTestUtil() else streamTestUtil()
     val table3Stats = new TableStats(5000000)
-    val table3Source = new MockTableSource(isBatch,
+    val table3Source = new MockTableSource(isBatchMode,
       new TableSchema(Array("a", "b", "c"),
       Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
     testUtil.addTableSource(
       "table3", table3Source, FlinkStatistic.builder().tableStats(table3Stats).build())
     val table5Stats = new TableStats(8000000)
-    val table5Source = new MockTableSource(isBatch,
+    val table5Source = new MockTableSource(isBatchMode,
       new TableSchema(Array("d", "e", "f", "g", "h"),
       Array[TypeInformation[_]](Types.INT, Types.LONG, Types.INT, Types.STRING, Types.LONG)))
     testUtil.addTableSource(
@@ -87,7 +87,7 @@ class ExecNodeResourceTest(isBatch: Boolean) extends TableTestBase {
   @Test
   def testUnionQuery(): Unit = {
     val statsOfTable4 = new TableStats(100L)
-    val table4Source = new MockTableSource(isBatch,
+    val table4Source = new MockTableSource(isBatchMode,
       new TableSchema(Array("a", "b", "c"),
         Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
     testUtil.addTableSource(
@@ -162,7 +162,7 @@ class ExecNodeResourceTest(isBatch: Boolean) extends TableTestBase {
 
 object ExecNodeResourceTest {
 
-  @Parameterized.Parameters(name = "isBatch={0}")
+  @Parameterized.Parameters(name = "isBatchMode={0}")
   def parameters(): util.Collection[Array[Any]] = {
     util.Arrays.asList(
       Array(true),
@@ -180,11 +180,9 @@ object ExecNodeResourceTest {
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for resource testing.
   */
-class MockTableSource(isBatch: Boolean, schema: TableSchema)
+class MockTableSource(val isBounded: Boolean, schema: TableSchema)
     extends StreamTableSource[BaseRow] {
 
-  override def isBounded: Boolean = isBatch
-
   override def getDataStream(
       execEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = {
     val transformation = mock(classOf[SourceTransformation[BaseRow]])
@@ -193,7 +191,7 @@ class MockTableSource(isBatch: Boolean, schema: TableSchema)
     when(bs.getTransformation).thenReturn(transformation)
     when(transformation.getOutputType).thenReturn(getReturnType)
     val factory = mock(classOf[StreamOperatorFactory[BaseRow]])
-    when(factory.isStreamSource).thenReturn(!isBatch)
+    when(factory.isStreamSource).thenReturn(!isBounded)
     when(transformation.getOperatorFactory).thenReturn(factory)
     bs
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
index 08b72ec..67724d8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
@@ -20,8 +20,10 @@ package org.apache.flink.table.plan.stream.sql
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types, ValidationException}
 import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.types.DataType
 import org.apache.flink.table.util._
 import org.apache.flink.types.Row
 
@@ -31,6 +33,10 @@ class TableSourceTest extends TableTestBase {
 
   private val util = streamTestUtil()
 
+  private val tableSchema = TableSchema.builder().fields(
+    Array("a", "b", "c"),
+    Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+
   @Before
   def setup(): Unit = {
     util.tableEnv.registerTableSource("FilterableTable", TestFilterableTableSource(false))
@@ -38,6 +44,33 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
+  def testBoundedStreamTableSource(): Unit = {
+    util.tableEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema))
+    util.verifyPlan("SELECT * FROM MyTable")
+  }
+
+  @Test
+  def testUnboundedStreamTableSource(): Unit = {
+    util.tableEnv.registerTableSource("MyTable", new TestTableSource(false, tableSchema))
+    util.verifyPlan("SELECT * FROM MyTable")
+  }
+
+  @Test
+  def testNonStreamTableSource(): Unit = {
+    val tableSource = new TableSource[Row]() {
+
+      override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+      override def getTableSchema: TableSchema = tableSchema
+    }
+    util.tableEnv.registerTableSource("MyTable", tableSource)
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Only StreamTableSource and LookupableTableSource can be used in Blink planner.")
+    util.verifyPlan("SELECT * FROM MyTable")
+  }
+
+  @Test
   def testTableSourceWithLongRowTimeField(): Unit = {
     val tableSchema = new TableSchema(
       Array("id", "rowtime", "val", "name"),
@@ -347,7 +380,7 @@ class TableSourceTest extends TableTestBase {
     row.setField(3, DateTimeTestUtil.localDateTime("2017-01-24 12:45:01.234"))
 
     val tableSource = TestFilterableTableSource(
-      isBatch = false, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
+      isBounded = false, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
     util.tableEnv.registerTableSource("FilterableTable1", tableSource)
 
     val sqlQuery =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 38b5d12..7a26fb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -105,13 +105,13 @@ abstract class TableTestBase {
   }
 }
 
-abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
+abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) {
   protected lazy val diffRepository: DiffRepository = DiffRepository.lookup(test.getClass)
 
-  protected val setting: EnvironmentSettings = if (isBatch) {
-    EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
-  } else {
+  protected val setting: EnvironmentSettings = if (isStreamingMode) {
     EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+  } else {
+    EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
   }
 
   // a counter for unique table names
@@ -124,6 +124,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
 
   protected def getTableEnv: TableEnvironment
 
+  protected def isBounded: Boolean = !isStreamingMode
+
   def getPlanner: PlannerBase = {
     getTableEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   }
@@ -190,7 +192,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
       FieldInfoUtils.getFieldsInfo(typeInfo, fields.toArray).toTableSchema
     }
 
-    addTableSource(name, new TestTableSource(isBatch, tableSchema))
+    addTableSource(name, new TestTableSource(isBounded, tableSchema))
   }
 
   /**
@@ -207,7 +209,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
       types: Array[TypeInformation[_]],
       fields: Array[String]): Table = {
     val schema = new TableSchema(fields, types)
-    val tableSource = new TestTableSource(isBatch, schema)
+    val tableSource = new TestTableSource(isBounded, schema)
     addTableSource(name, tableSource)
   }
 
@@ -478,9 +480,10 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
 
 abstract class TableTestUtil(
     test: TableTestBase,
-    isBatch: Boolean,
+    // determines if the table environment should work in a batch or streaming mode
+    isStreamingMode: Boolean,
     catalogManager: Option[CatalogManager] = None)
-  extends TableTestUtilBase(test, isBatch) {
+  extends TableTestUtilBase(test, isStreamingMode) {
   protected val testingTableEnv: TestingTableEnvironment =
     TestingTableEnvironment.create(setting, catalogManager)
   val tableEnv: TableEnvironment = testingTableEnv
@@ -510,7 +513,7 @@ abstract class TableTestUtil(
       fields: Array[String],
       statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table = {
     val schema = new TableSchema(fields, types)
-    val tableSource = new TestTableSource(isBatch, schema)
+    val tableSource = new TestTableSource(isBounded, schema)
     addTableSource(name, tableSource, statistic)
   }
 
@@ -529,7 +532,7 @@ abstract class TableTestUtil(
     // TODO RichTableSourceQueryOperation should be deleted and use registerTableSource method
     //  instead of registerTable method here after unique key in TableSchema is ready
     //  and setting catalog statistic to TableSourceTable in DatabaseCalciteSchema is ready
-    val operation = new RichTableSourceQueryOperation(tableSource, isBatch, statistic)
+    val operation = new RichTableSourceQueryOperation(tableSource, statistic)
     val table = testingTableEnv.createTable(operation)
     testingTableEnv.registerTable(name, table)
     testingTableEnv.scan(name)
@@ -593,8 +596,8 @@ abstract class TableTestUtil(
 
 abstract class ScalaTableTestUtil(
     test: TableTestBase,
-    isBatch: Boolean)
-  extends TableTestUtilBase(test, isBatch) {
+    isStreamingMode: Boolean)
+  extends TableTestUtilBase(test, isStreamingMode) {
   // scala env
   val env = new ScalaStreamExecEnv(new LocalStreamEnvironment())
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -620,8 +623,8 @@ abstract class ScalaTableTestUtil(
 
 abstract class JavaTableTestUtil(
     test: TableTestBase,
-    isBatch: Boolean)
-  extends TableTestUtilBase(test, isBatch) {
+    isStreamingMode: Boolean)
+  extends TableTestUtilBase(test, isStreamingMode) {
   // java env
   val env = new LocalStreamEnvironment()
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -653,7 +656,7 @@ abstract class JavaTableTestUtil(
 case class StreamTableTestUtil(
     test: TableTestBase,
     catalogManager: Option[CatalogManager] = None)
-  extends TableTestUtil(test, false, catalogManager) {
+  extends TableTestUtil(test, isStreamingMode = true, catalogManager) {
 
   /**
     * Register a table with specific row time field and offset.
@@ -776,14 +779,13 @@ case class StreamTableTestUtil(
 /**
   * Utility for stream scala table test.
   */
-case class ScalaStreamTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, false) {
+case class ScalaStreamTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, true) {
 }
 
 /**
   * Utility for stream java table test.
   */
-case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, false) {
-
+case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, true) {
 }
 
 /**
@@ -792,7 +794,7 @@ case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUti
 case class BatchTableTestUtil(
     test: TableTestBase,
     catalogManager: Option[CatalogManager] = None)
-  extends TableTestUtil(test, true, catalogManager) {
+  extends TableTestUtil(test, isStreamingMode = false, catalogManager) {
 
   def buildBatchProgram(firstProgramNameToRemove: String): Unit = {
     val program = FlinkBatchProgram.buildProgram(tableEnv.getConfig.getConfiguration)
@@ -835,25 +837,22 @@ case class BatchTableTestUtil(
 /**
   * Utility for batch scala table test.
   */
-case class ScalaBatchTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, true) {
+case class ScalaBatchTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, false) {
 }
 
 /**
   * Utility for batch java table test.
   */
-case class JavaBatchTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, true) {
+case class JavaBatchTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, false) {
 }
 
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
   */
-class TestTableSource(isBatch: Boolean, schema: TableSchema)
+class TestTableSource(val isBounded: Boolean, schema: TableSchema)
   extends StreamTableSource[Row] {
 
-  override def isBounded: Boolean = isBatch
-
-  override def getDataStream(
-      execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
+  override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
     execEnv.fromCollection(List[Row](), getReturnType)
   }
 
@@ -871,14 +870,14 @@ class TestingTableEnvironment private(
     executor: Executor,
     functionCatalog: FunctionCatalog,
     planner: PlannerBase,
-    isStreaming: Boolean)
+    isStreamingMode: Boolean)
   extends TableEnvironmentImpl(
     catalogManager,
     tableConfig,
     executor,
     functionCatalog,
     planner,
-    isStreaming) {
+    isStreamingMode) {
 
   private val bufferedOperations: util.List[ModifyOperation] = new util.ArrayList[ModifyOperation]
 
@@ -1002,7 +1001,7 @@ object TestingTableEnvironment {
       .create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
       .asInstanceOf[PlannerBase]
     new TestingTableEnvironment(
-      catalogMgr, tableConfig, executor, functionCatalog, planner, !settings.isBatchMode)
+      catalogMgr, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode)
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index b7c4f47..f51b1e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -132,7 +132,7 @@ object TestTableSources {
 }
 
 class TestTableSourceWithTime[T](
-    isBatch: Boolean,
+    val isBounded: Boolean,
     tableSchema: TableSchema,
     returnType: TypeInformation[T],
     values: Seq[T],
@@ -144,8 +144,6 @@ class TestTableSourceWithTime[T](
   with DefinedProctimeAttribute
   with DefinedFieldMapping {
 
-  override def isBounded: Boolean = isBatch
-
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
     val dataStream = execEnv.fromCollection(values, returnType)
     dataStream.getTransformation.setMaxParallelism(1)
@@ -206,7 +204,7 @@ class TestPreserveWMTableSource[T](
 }
 
 class TestProjectableTableSource(
-    isBatch: Boolean,
+    isBounded: Boolean,
     tableSchema: TableSchema,
     returnType: TypeInformation[Row],
     values: Seq[Row],
@@ -214,7 +212,7 @@ class TestProjectableTableSource(
     proctime: String = null,
     fieldMapping: Map[String, String] = null)
   extends TestTableSourceWithTime[Row](
-    isBatch,
+    isBounded,
     tableSchema,
     returnType,
     values,
@@ -256,7 +254,7 @@ class TestProjectableTableSource(
     }
 
     new TestProjectableTableSource(
-      isBatch,
+      isBounded,
       newTableSchema,
       projectedReturnType,
       projectedValues,
@@ -272,14 +270,14 @@ class TestProjectableTableSource(
 }
 
 class TestNestedProjectableTableSource(
-    isBatch: Boolean,
+    isBounded: Boolean,
     tableSchema: TableSchema,
     returnType: TypeInformation[Row],
     values: Seq[Row],
     rowtime: String = null,
     proctime: String = null)
   extends TestTableSourceWithTime[Row](
-    isBatch,
+    isBounded,
     tableSchema,
     returnType,
     values,
@@ -316,7 +314,7 @@ class TestNestedProjectableTableSource(
     }
 
     val copy = new TestNestedProjectableTableSource(
-      isBatch,
+      isBounded,
       newTableSchema,
       projectedReturnType,
       projectedValues,
@@ -336,7 +334,7 @@ class TestNestedProjectableTableSource(
   * A data source that implements some very basic filtering in-memory in order to test
   * expression push-down logic.
   *
-  * @param isBatch whether this is a bounded source
+  * @param isBounded whether this is a bounded source
   * @param rowTypeInfo The type info for the rows.
   * @param data The data that filtering is applied to in order to get the final dataset.
   * @param filterableFields The fields that are allowed to be filtered.
@@ -344,7 +342,7 @@ class TestNestedProjectableTableSource(
   * @param filterPushedDown Whether predicates have been pushed down yet.
   */
 class TestFilterableTableSource(
-    isBatch: Boolean,
+    val isBounded: Boolean,
     rowTypeInfo: RowTypeInfo,
     data: Seq[Row],
     filterableFields: Set[String] = Set(),
@@ -357,8 +355,6 @@ class TestFilterableTableSource(
 
   val fieldTypes: Array[TypeInformation[_]] = rowTypeInfo.getFieldTypes
 
-  override def isBounded: Boolean = isBatch
-
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
     execEnv.fromCollection[Row](applyPredicatesToRows(data).asJava, getReturnType)
       .setParallelism(1).setMaxParallelism(1)
@@ -386,7 +382,7 @@ class TestFilterableTableSource(
     }
 
     new TestFilterableTableSource(
-      isBatch,
+      isBounded,
       rowTypeInfo,
       data,
       filterableFields,
@@ -496,14 +492,14 @@ object TestFilterableTableSource {
   /**
     * @return The default filterable table source.
     */
-  def apply(isBatch: Boolean): TestFilterableTableSource = {
-    apply(isBatch, defaultTypeInfo, defaultRows, defaultFilterableFields)
+  def apply(isBounded: Boolean): TestFilterableTableSource = {
+    apply(isBounded, defaultTypeInfo, defaultRows, defaultFilterableFields)
   }
 
   /**
     * A filterable data source with custom data.
     *
-    * @param isBatch whether this is a bounded source
+    * @param isBounded whether this is a bounded source
     * @param rowTypeInfo The type of the data. Its expected that both types and field
     *                    names are provided.
     * @param rows The data as a sequence of rows.
@@ -511,11 +507,11 @@ object TestFilterableTableSource {
     * @return The table source.
     */
   def apply(
-      isBatch: Boolean,
+      isBounded: Boolean,
       rowTypeInfo: RowTypeInfo,
       rows: Seq[Row],
       filterableFields: Set[String]): TestFilterableTableSource = {
-    new TestFilterableTableSource(isBatch, rowTypeInfo, rows, filterableFields)
+    new TestFilterableTableSource(isBounded, rowTypeInfo, rows, filterableFields)
   }
 
   private lazy val defaultFilterableFields = Set("amount")
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index 5025adf..9b50b91 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -41,12 +41,12 @@ import java.util.Set;
 @Internal
 public class CatalogCalciteSchema implements Schema {
 
-	private final boolean isBatch;
+	private final boolean isStreamingMode;
 	private final String catalogName;
 	private final Catalog catalog;
 
-	public CatalogCalciteSchema(boolean isBatch, String catalogName, Catalog catalog) {
-		this.isBatch = isBatch;
+	public CatalogCalciteSchema(boolean isStreamingMode, String catalogName, Catalog catalog) {
+		this.isStreamingMode = isStreamingMode;
 		this.catalogName = catalogName;
 		this.catalog = catalog;
 	}
@@ -61,7 +61,7 @@ public class CatalogCalciteSchema implements Schema {
 	public Schema getSubSchema(String schemaName) {
 
 		if (catalog.databaseExists(schemaName)) {
-			return new DatabaseCalciteSchema(isBatch, schemaName, catalogName, catalog);
+			return new DatabaseCalciteSchema(isStreamingMode, schemaName, catalogName, catalog);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index ceef249..c017618 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -49,11 +49,11 @@ import java.util.stream.Stream;
 public class CatalogManagerCalciteSchema implements Schema {
 
 	private final CatalogManager catalogManager;
-	private boolean isBatch;
+	private boolean isStreamingMode;
 
-	public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isBatch) {
+	public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
-		this.isBatch = isBatch;
+		this.isStreamingMode = isStreamingMode;
 	}
 
 	@Override
@@ -89,11 +89,11 @@ public class CatalogManagerCalciteSchema implements Schema {
 	@Override
 	public Schema getSubSchema(String name) {
 		Optional<Schema> externalSchema = catalogManager.getExternalCatalog(name)
-			.map(externalCatalog -> new ExternalCatalogSchema(isBatch, name, externalCatalog));
+			.map(externalCatalog -> new ExternalCatalogSchema(isStreamingMode, name, externalCatalog));
 
 		return externalSchema.orElseGet(() ->
 			catalogManager.getCatalog(name)
-				.map(catalog -> new CatalogCalciteSchema(isBatch, name, catalog))
+				.map(catalog -> new CatalogCalciteSchema(isStreamingMode, name, catalog))
 				.orElse(null)
 		);
 	}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index ac7cdb8..46af332 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -51,17 +51,17 @@ import static java.lang.String.format;
  * Tables are registered as tables in the schema.
  */
 class DatabaseCalciteSchema implements Schema {
-	private final boolean isBatch;
+	private final boolean isStreamingMode;
 	private final String databaseName;
 	private final String catalogName;
 	private final Catalog catalog;
 
 	public DatabaseCalciteSchema(
-			boolean isBatch,
+			boolean isStreamingMode,
 			String databaseName,
 			String catalogName,
 			Catalog catalog) {
-		this.isBatch = isBatch;
+		this.isStreamingMode = isStreamingMode;
 		this.databaseName = databaseName;
 		this.catalogName = catalogName;
 		this.catalog = catalog;
@@ -132,7 +132,7 @@ class DatabaseCalciteSchema implements Schema {
 			// this means the TableSource extends from StreamTableSource, this is needed for the
 			// legacy Planner. Blink Planner should use the information that comes from the TableSource
 			// itself to determine if it is a streaming or batch source.
-			!isBatch,
+			isStreamingMode,
 			FlinkStatistic.UNKNOWN()
 		);
 	}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
index dff5590..cf9f650 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
@@ -60,7 +60,7 @@ public class StreamExecutorFactory implements ExecutorFactory {
 	@Override
 	public Map<String, String> requiredContext() {
 		DescriptorProperties properties = new DescriptorProperties();
-		properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+		properties.putBoolean(EnvironmentSettings.STREAMING_MODE, true);
 		return properties.asMap();
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
index 4efb850..62e19bd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
@@ -59,7 +59,7 @@ public final class StreamPlannerFactory implements PlannerFactory {
 	public Map<String, String> requiredContext() {
 		DescriptorProperties properties = new DescriptorProperties();
 
-		properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+		properties.putBoolean(EnvironmentSettings.STREAMING_MODE, true);
 		return properties.asMap();
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3452af6..06a04b7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -85,22 +85,24 @@ abstract class TableEnvImpl(
   private[flink] val operationTreeBuilder = OperationTreeBuilder.create(
     functionCatalog,
     tableLookup,
-    !isBatch)
+    isStreamingMode)
 
   protected val planningConfigurationBuilder: PlanningConfigurationBuilder =
     new PlanningConfigurationBuilder(
       config,
       functionCatalog,
-      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isBatch)),
+      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
       expressionBridge)
 
   def getConfig: TableConfig = config
 
-  private def isBatch: Boolean = this match {
-    case _: BatchTableEnvImpl => true
-    case _ => false
+  private def isStreamingMode: Boolean = this match {
+    case _: BatchTableEnvImpl => false
+    case _ => true
   }
 
+  private def isBatchTable: Boolean = !isStreamingMode
+
   override def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
     catalogManager.registerExternalCatalog(name, externalCatalog)
   }
@@ -237,7 +239,7 @@ abstract class TableEnvImpl(
   }
 
   override def fromTableSource(source: TableSource[_]): Table = {
-    createTable(new TableSourceQueryOperation(source, isBatch))
+    createTable(new TableSourceQueryOperation(source, isBatchTable))
   }
 
   /**
@@ -276,12 +278,12 @@ abstract class TableEnvImpl(
           replaceTableInternal(
             name,
             ConnectorCatalogTable
-              .sourceAndSink(tableSource, table.getTableSink.get, isBatch))
+              .sourceAndSink(tableSource, table.getTableSink.get, isBatchTable))
         }
 
       // no table is registered
       case _ =>
-        registerTableInternal(name, ConnectorCatalogTable.source(tableSource, isBatch))
+        registerTableInternal(name, ConnectorCatalogTable.source(tableSource, isBatchTable))
     }
   }
 
@@ -303,12 +305,12 @@ abstract class TableEnvImpl(
           replaceTableInternal(
             name,
             ConnectorCatalogTable
-              .sourceAndSink(table.getTableSource.get, tableSink, isBatch))
+              .sourceAndSink(table.getTableSource.get, tableSink, isBatchTable))
         }
 
       // no table is registered
       case _ =>
-        registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, isBatch))
+        registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, isBatchTable))
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 4b2287f..88c2620 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -42,7 +42,7 @@ import scala.collection.JavaConverters._
   */
 @deprecated
 class ExternalCatalogSchema(
-    isBatch: Boolean,
+    isStreamingMode: Boolean,
     catalogIdentifier: String,
     catalog: ExternalCatalog) extends Schema with Logging {
 
@@ -56,7 +56,7 @@ class ExternalCatalogSchema(
   override def getSubSchema(name: String): Schema = {
     try {
       val db = catalog.getSubCatalog(name)
-      new ExternalCatalogSchema(isBatch, name, db)
+      new ExternalCatalogSchema(isStreamingMode, name, db)
     } catch {
       case _: CatalogNotExistException =>
         LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
@@ -81,7 +81,7 @@ class ExternalCatalogSchema(
     */
   override def getTable(name: String): Table = try {
     val externalCatalogTable = catalog.getTable(name)
-    ExternalTableUtil.fromExternalCatalogTable(isBatch, externalCatalogTable).orNull
+    ExternalTableUtil.fromExternalCatalogTable(isStreamingMode, externalCatalogTable).orNull
   } catch {
     case _: TableNotExistException => {
       LOG.warn(s"Table $name does not exist in externalCatalog $catalogIdentifier")
@@ -126,11 +126,12 @@ object ExternalCatalogSchema {
     * @param externalCatalog           The external catalog to register
     */
   def registerCatalog(
-      isBatch: Boolean,
+      isStreamingMode: Boolean,
       parentSchema: SchemaPlus,
       externalCatalogIdentifier: String,
       externalCatalog: ExternalCatalog): Unit = {
-    val newSchema = new ExternalCatalogSchema(isBatch, externalCatalogIdentifier, externalCatalog)
+    val newSchema = new ExternalCatalogSchema(
+      isStreamingMode, externalCatalogIdentifier, externalCatalog)
     val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
     newSchema.registerSubSchemas(schemaPlusOfNewSchema)
   }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
index 4ac24dd..b55ae2e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
@@ -39,37 +39,37 @@ object ExternalTableUtil extends Logging {
     * @param externalTable the [[ExternalCatalogTable]] instance which to convert
     * @return converted [[TableSourceTable]] instance from the input catalog table
     */
-  def fromExternalCatalogTable[T](isBatch: Boolean, externalTable: ExternalCatalogTable)
+  def fromExternalCatalogTable[T](isStreamingMode: Boolean, externalTable: ExternalCatalogTable)
     : Option[TableSourceTable[T]] = {
 
     val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
 
     if (externalTable.isTableSource) {
-      Some(createTableSource(isBatch, externalTable, statistics))
+      Some(createTableSource(isStreamingMode, externalTable, statistics))
     } else {
       None
     }
   }
 
   private def createTableSource[T](
-      isBatch: Boolean,
+      isStreamingMode: Boolean,
       externalTable: ExternalCatalogTable,
       statistics: FlinkStatistic)
     : TableSourceTable[T] = {
-    val source = if (isModeCompatibleWithTable(isBatch, externalTable)) {
+    val source = if (isModeCompatibleWithTable(isStreamingMode, externalTable)) {
       TableFactoryUtil.findAndCreateTableSource(externalTable)
     } else {
       throw new ValidationException(
         "External catalog table does not support the current environment for a table source.")
     }
 
-    new TableSourceTable[T](source.asInstanceOf[TableSource[T]], !isBatch, statistics)
+    new TableSourceTable[T](source.asInstanceOf[TableSource[T]], isStreamingMode, statistics)
   }
 
   private def isModeCompatibleWithTable[T](
-      isBatch: Boolean,
+      isStreamingMode: Boolean,
       externalTable: ExternalCatalogTable)
     : Boolean = {
-    isBatch && externalTable.isBatchTable || !isBatch && externalTable.isStreamTable
+    !isStreamingMode && externalTable.isBatchTable || isStreamingMode && externalTable.isStreamTable
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 89aff89..fb0a09c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -50,7 +50,7 @@ class FlinkLogicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    val streamingTable = table.unwrap(classOf[TableSourceTable[_]]).isStreaming
+    val streamingTable = table.unwrap(classOf[TableSourceTable[_]]).isStreamingMode
 
     TableSourceUtil.getRelDataType(tableSource, selectedFields, streamingTable, flinkTypeFactory)
   }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
index c51c99a..e45ad85 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -40,7 +40,7 @@ class BatchTableSourceScanRule
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
 
     val sourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    sourceTable != null && !sourceTable.isStreaming
+    sourceTable != null && !sourceTable.isStreamingMode
   }
 
   def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index a23830c..1ddc1c3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -41,7 +41,7 @@ class StreamTableSourceScanRule
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
 
     val sourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    sourceTable != null && sourceTable.isStreaming
+    sourceTable != null && sourceTable.isStreamingMode
   }
 
   def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index daeda7b..f14fe03 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -28,10 +28,14 @@ import org.apache.flink.table.sources.{TableSource, TableSourceUtil, TableSource
 /**
   * Abstract class which define the interfaces required to convert a [[TableSource]] to
   * a Calcite Table.
+  *
+  * @param tableSource The [[TableSource]] for which is converted to a Calcite Table.
+  * @param isStreamingMode A flag that tells if the current table is in stream mode.
+  * @param statistic The table statistics.
   */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
-    val isStreaming: Boolean,
+    val isStreamingMode: Boolean,
     val statistic: FlinkStatistic)
   extends AbstractTable {
 
@@ -48,7 +52,7 @@ class TableSourceTable[T](
     TableSourceUtil.getRelDataType(
       tableSource,
       None,
-      isStreaming,
+      isStreamingMode,
       typeFactory.asInstanceOf[FlinkTypeFactory])
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index de163a3..057e70a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -75,7 +75,7 @@ class StreamPlanner(
   functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
   private val internalSchema: CalciteSchema =
-    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false))
+    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true))
 
   // temporary bridge between API and planner
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index 312b5de..ea447bb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -48,7 +48,7 @@ public class DatabaseCalciteSchemaTest {
 	@Test
 	public void testCatalogTable() throws TableAlreadyExistException, DatabaseNotExistException {
 		GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(catalogName, databaseName);
-		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(false,
+		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true,
 			databaseName,
 			catalogName,
 			catalog);
@@ -59,7 +59,7 @@ public class DatabaseCalciteSchemaTest {
 		assertThat(table, instanceOf(TableSourceTable.class));
 		TableSourceTable tableSourceTable = (TableSourceTable) table;
 		assertThat(tableSourceTable.tableSource(), instanceOf(TestExternalTableSource.class));
-		assertThat(tableSourceTable.isStreaming(), is(true));
+		assertThat(tableSourceTable.isStreamingMode(), is(true));
 	}
 
 	private static final class TestCatalogBaseTable extends CatalogTableImpl {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index c49cd9a..74d203b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -48,7 +48,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
   def setUp(): Unit = {
     val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
     val catalog = CommonTestData.getInMemoryTestCatalog(isStreaming = true)
-    ExternalCatalogSchema.registerCatalog(false, rootSchemaPlus, schemaName, catalog)
+    ExternalCatalogSchema.registerCatalog(true, rootSchemaPlus, schemaName, catalog)
     externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
     val prop = new Properties()