You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/31 11:54:42 UTC
[flink] 01/06: [table-api-java] Postpone check for Blink planner in
StreamTableEnvironment
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f1a2312f62f701cfd0a8f29d43e3e96f054c254d
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:40:35 2019 +0200
[table-api-java] Postpone check for Blink planner in StreamTableEnvironment
---
.../table/api/java/internal/StreamTableEnvironmentImpl.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 2e35bb9..8987940 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.api.java.internal;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
@@ -79,7 +78,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
private final StreamExecutionEnvironment executionEnvironment;
- @VisibleForTesting
public StreamTableEnvironmentImpl(
CatalogManager catalogManager,
FunctionCatalog functionCatalog,
@@ -90,11 +88,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
boolean isStreamingMode) {
super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
this.executionEnvironment = executionEnvironment;
-
- if (!isStreamingMode) {
- throw new TableException(
- "StreamTableEnvironment is not supported in batch mode now, please use TableEnvironment.");
- }
}
public static StreamTableEnvironment create(
@@ -102,6 +95,11 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
EnvironmentSettings settings,
TableConfig tableConfig) {
+ if (!settings.isStreamingMode()) {
+ throw new TableException(
+ "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
+ }
+
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));