You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/11/25 02:43:38 UTC

[flink] branch release-1.9 updated: [FLINK-13708] [table-planner-blink] transformations should be cleared after execution in blink planner (#10290)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 2dd3ba8  [FLINK-13708] [table-planner-blink] transformations should be cleared after execution in blink planner (#10290)
2dd3ba8 is described below

commit 2dd3ba8335d35de7ed59073def4f0093326f1464
Author: godfrey he <go...@163.com>
AuthorDate: Mon Nov 25 10:43:25 2019 +0800

    [FLINK-13708] [table-planner-blink] transformations should be cleared after execution in blink planner (#10290)
---
 .../table/tests/test_table_environment_api.py      |  25 +-
 .../client/gateway/local/ExecutionContext.java     |   2 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |   6 +
 .../internal/StreamTableEnvironmentImpl.scala      |   6 +
 .../table/planner/delegation/BatchExecutor.java    |  67 +----
 .../table/planner/delegation/ExecutorBase.java     |  19 +-
 .../table/planner/delegation/StreamExecutor.java   |  14 +-
 .../utils/DummyStreamExecutionEnvironment.java     | 295 +++++++++++++++++++++
 .../ExecutorUtils.java}                            |  63 +++--
 .../table/planner/delegation/BatchPlanner.scala    |  23 +-
 .../table/planner/delegation/PlannerBase.scala     |   5 +-
 .../table/planner/delegation/StreamPlanner.scala   |  18 +-
 .../flink/table/api/TableEnvironmentITCase.scala   | 179 +++++++++++++
 .../flink/table/api/TableEnvironmentTest.scala     |  25 +-
 14 files changed, 615 insertions(+), 132 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 8bbd491..8c657a3 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -132,9 +132,8 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
         t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
 
-        actual = t_env.explain(extended=True)
-
-        assert isinstance(actual, str) or isinstance(actual, unicode)
+        with self.assertRaises(TableException):
+            t_env.explain(extended=True)
 
     def test_sql_query(self):
         t_env = self.t_env
@@ -363,3 +362,23 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
                         line = f.readline()
 
         self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
+
+    def test_explain_with_multi_sinks_with_blink_planner(self):
+        t_env = BatchTableEnvironment.create(
+            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
+            .use_blink_planner().build())
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sink1",
+            CsvTableSink(field_names, field_types, "path1"))
+        t_env.register_table_sink(
+            "sink2",
+            CsvTableSink(field_names, field_types, "path2"))
+
+        t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
+        t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
+
+        actual = t_env.explain(extended=True)
+        self.assertIsInstance(actual, (str, unicode))
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 27c3ee2..e212f05 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -454,7 +454,7 @@ public class ExecutionContext<T> {
 				// special case for Blink planner to apply batch optimizations
 				// note: it also modifies the ExecutionConfig!
 				if (executor instanceof ExecutorBase) {
-					return ((ExecutorBase) executor).generateStreamGraph(name);
+					return ((ExecutorBase) executor).getStreamGraph(name);
 				}
 				return streamExecEnv.getStreamGraph(name);
 			} else {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index cf2ace7..f343d4f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -335,6 +335,12 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		return true;
 	}
 
+	@Override
+	public String explain(boolean extended) {
+		// throw exception directly, because the operations to explain are always empty
+		throw new TableException("'explain' method without any tables is unsupported in StreamTableEnvironment.");
+	}
+
 	private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
 		try {
 			return TypeExtractor.createTypeInfo(clazz);
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c45d324..052551f 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -193,6 +193,12 @@ class StreamTableEnvironmentImpl (
 
   override protected def isEagerOperationTranslation(): Boolean = true
 
+  override def explain(extended: Boolean): String = {
+    // throw exception directly, because the operations to explain are always empty
+    throw new TableException(
+      "'explain' method without any tables is unsupported in StreamTableEnvironment.")
+  }
+
   private def toDataStream[T](
       table: Table,
       modifyOperation: OutputConversionModifyOperation)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
index 7bf4367..2334e93 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
@@ -20,21 +20,11 @@ package org.apache.flink.table.planner.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;
-
-import java.util.List;
+import org.apache.flink.table.planner.utils.ExecutorUtils;
 
 /**
  * An implementation of {@link Executor} that is backed by a {@link StreamExecutionEnvironment}.
@@ -50,59 +40,18 @@ public class BatchExecutor extends ExecutorBase {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		StreamGraph streamGraph = generateStreamGraph(jobName);
-		return execEnv.execute(streamGraph);
-	}
-
-	/**
-	 * Sets batch configs.
-	 */
-	private void setBatchProperties(StreamExecutionEnvironment execEnv) {
-		ExecutionConfig executionConfig = execEnv.getConfig();
-		executionConfig.enableObjectReuse();
-		executionConfig.setLatencyTrackingInterval(-1);
-		execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-		execEnv.setBufferTimeout(-1);
-		if (isShuffleModeAllBatch()) {
-			executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
-		}
+		StreamGraph streamGraph = getStreamGraph(jobName);
+		return getExecutionEnvironment().execute(streamGraph);
 	}
 
 	@Override
-	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
+	public StreamGraph getStreamGraph(String jobName) {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		setBatchProperties(execEnv);
-		transformations.forEach(execEnv::addOperator);
-		StreamGraph streamGraph;
-		streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
-		// All transformations should set managed memory size.
-		ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
-		streamGraph.getStreamNodes().forEach(sn -> {
-			if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
-				sn.setResources(managedResourceSpec, managedResourceSpec);
-			}
-		});
-		streamGraph.setChaining(true);
-		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
-		streamGraph.setStateBackend(null);
-		if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
-			throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
-		}
-		if (isShuffleModeAllBatch()) {
-			streamGraph.setBlockingConnectionsBetweenChains(true);
-		}
+		ExecutorUtils.setBatchProperties(execEnv, tableConfig);
+		StreamGraph streamGraph = execEnv.getStreamGraph();
+		streamGraph.setJobName(getNonEmptyJobName(jobName));
+		ExecutorUtils.setBatchProperties(streamGraph, tableConfig);
 		return streamGraph;
 	}
 
-	private boolean isShuffleModeAllBatch() {
-		String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE);
-		if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
-			return true;
-		} else if (!value.equalsIgnoreCase(ShuffleMode.PIPELINED.toString())) {
-			throw new IllegalArgumentException(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.key() +
-					" can only be set to " + ShuffleMode.BATCH.toString() + " or " + ShuffleMode.PIPELINED.toString());
-		}
-		return false;
-	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
index 10eeafd..af12c22 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.util.StringUtils;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -38,7 +37,6 @@ public abstract class ExecutorBase implements Executor {
 	private static final String DEFAULT_JOB_NAME = "Flink Exec Table Job";
 
 	private final StreamExecutionEnvironment executionEnvironment;
-	protected List<Transformation<?>> transformations = new ArrayList<>();
 	protected TableConfig tableConfig;
 
 	public ExecutorBase(StreamExecutionEnvironment executionEnvironment) {
@@ -49,26 +47,19 @@ public abstract class ExecutorBase implements Executor {
 		this.tableConfig = tableConfig;
 	}
 
-	@Override
-	public void apply(List<Transformation<?>> transformations) {
-		this.transformations.addAll(transformations);
-	}
-
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return executionEnvironment;
 	}
 
-	/**
-	 * Translates the applied transformations to a stream graph.
-	 */
-	public StreamGraph generateStreamGraph(String jobName) {
-		return generateStreamGraph(transformations, jobName);
+	@Override
+	public void apply(List<Transformation<?>> transformations) {
+		transformations.forEach(getExecutionEnvironment()::addOperator);
 	}
 
 	/**
-	 * Translates the given transformations to a stream graph.
+	 * Translates the transformations applied into this executor to a stream graph.
 	 */
-	public abstract StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName);
+	public abstract StreamGraph getStreamGraph(String jobName);
 
 	protected String getNonEmptyJobName(String jobName) {
 		if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
index 8d1e904..034da4d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
@@ -21,13 +21,10 @@ package org.apache.flink.table.planner.delegation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.table.delegation.Executor;
 
-import java.util.List;
-
 /**
  * An implementation of {@link Executor} that is backed by a {@link StreamExecutionEnvironment}.
  * This is the only executor that {@link org.apache.flink.table.planner.delegation.StreamPlanner} supports.
@@ -42,14 +39,13 @@ public class StreamExecutor extends ExecutorBase {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		return execEnv.execute(generateStreamGraph(transformations, jobName));
+		return getExecutionEnvironment().execute(getNonEmptyJobName(jobName));
 	}
 
 	@Override
-	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
-		transformations.forEach(getExecutionEnvironment()::addOperator);
-		return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
+	public StreamGraph getStreamGraph(String jobName) {
+		StreamGraph streamGraph = getExecutionEnvironment().getStreamGraph();
+		streamGraph.setJobName(getNonEmptyJobName(jobName));
+		return streamGraph;
 	}
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
new file mode 100644
index 0000000..23d70d2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.TableSource;
+
+import com.esotericsoftware.kryo.Serializer;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * This is dummy {@link StreamExecutionEnvironment}, only used for {@link PlannerBase#explain(List, boolean)} method.
+ *
+ * <P>{@link Transformation}s will be added into a {@link StreamExecutionEnvironment} when translating ExecNode to plan,
+ * and they will be cleared only when calling {@link StreamExecutionEnvironment#execute()} method.
+ *
+ * <p>{@link PlannerBase#explain(List, boolean)} method will not only print logical plan but also execution plan,
+ * translating will happen in explain method. If calling explain method before execute method, the transformations in
+ * StreamExecutionEnvironment is dirty, and execution result may be incorrect.
+ *
+ * <p>All set methods (e.g. `setXX`, `enableXX`, `disableXX`, etc) are disabled to prohibit changing configuration,
+ * all get methods (e.g. `getXX`, `isXX`, etc) will be delegated to real StreamExecutionEnvironment.
+ * `execute`, `getStreamGraph`, `getExecutionPlan` methods are also disabled, while `addOperator` method is enabled to
+ * let `explain` method add Transformations to this StreamExecutionEnvironment.
+ *
+ * <p>This class could be removed once the {@link TableSource} interface and {@link StreamTableSink} interface
+ * are reworked.
+ */
+public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+	private final StreamExecutionEnvironment realExecEnv;
+
+	public DummyStreamExecutionEnvironment(StreamExecutionEnvironment realExecEnv) {
+		this.realExecEnv = realExecEnv;
+	}
+
+	@Override
+	public ExecutionConfig getConfig() {
+		return realExecEnv.getConfig();
+	}
+
+	@Override
+	public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
+		return realExecEnv.getCachedFiles();
+	}
+
+	@Override
+	public StreamExecutionEnvironment setParallelism(int parallelism) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setParallelism method is unsupported.");
+	}
+
+	@Override
+	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setMaxParallelism method is unsupported.");
+	}
+
+	@Override
+	public int getParallelism() {
+		return realExecEnv.getParallelism();
+	}
+
+	@Override
+	public int getMaxParallelism() {
+		return realExecEnv.getMaxParallelism();
+	}
+
+	@Override
+	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setBufferTimeout method is unsupported.");
+	}
+
+	@Override
+	public long getBufferTimeout() {
+		return realExecEnv.getBufferTimeout();
+	}
+
+	@Override
+	public StreamExecutionEnvironment disableOperatorChaining() {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, disableOperatorChaining method is unsupported.");
+	}
+
+	@Override
+	public boolean isChainingEnabled() {
+		return realExecEnv.isChainingEnabled();
+	}
+
+	@Override
+	public CheckpointConfig getCheckpointConfig() {
+		return realExecEnv.getCheckpointConfig();
+	}
+
+	@Override
+	public StreamExecutionEnvironment enableCheckpointing(long interval) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+	}
+
+	@Override
+	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+	}
+
+	@Override
+	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+	}
+
+	@Override
+	public StreamExecutionEnvironment enableCheckpointing() {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+	}
+
+	@Override
+	public long getCheckpointInterval() {
+		return realExecEnv.getCheckpointInterval();
+	}
+
+	@Override
+	public boolean isForceCheckpointing() {
+		return realExecEnv.isForceCheckpointing();
+	}
+
+	@Override
+	public CheckpointingMode getCheckpointingMode() {
+		return realExecEnv.getCheckpointingMode();
+	}
+
+	@Override
+	public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setStateBackend method is unsupported.");
+	}
+
+	@Override
+	public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setStateBackend method is unsupported.");
+	}
+
+	@Override
+	public StateBackend getStateBackend() {
+		return realExecEnv.getStateBackend();
+	}
+
+	@Override
+	public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setRestartStrategy method is unsupported.");
+	}
+
+	@Override
+	public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+		return realExecEnv.getRestartStrategy();
+	}
+
+	@Override
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setNumberOfExecutionRetries method is unsupported.");
+	}
+
+	@Override
+	public int getNumberOfExecutionRetries() {
+		return realExecEnv.getNumberOfExecutionRetries();
+	}
+
+	@Override
+	public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(Class<?> type, T serializer) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, addDefaultKryoSerializer method is unsupported.");
+	}
+
+	@Override
+	public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, addDefaultKryoSerializer method is unsupported.");
+	}
+
+	@Override
+	public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, registerTypeWithKryoSerializer method is unsupported.");
+	}
+
+	@Override
+	public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, registerTypeWithKryoSerializer method is unsupported.");
+	}
+
+	@Override
+	public void registerType(Class<?> type) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, registerType method is unsupported.");
+	}
+
+	@Override
+	public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, setStreamTimeCharacteristic method is unsupported.");
+	}
+
+	@Override
+	public TimeCharacteristic getStreamTimeCharacteristic() {
+		return realExecEnv.getStreamTimeCharacteristic();
+	}
+
+	@Override
+	public JobExecutionResult execute() throws Exception {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, execute method is unsupported.");
+	}
+
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, execute method is unsupported.");
+	}
+
+	@Override
+	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, execute method is unsupported.");
+	}
+
+	@Override
+	public void registerCachedFile(String filePath, String name) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, registerCachedFile method is unsupported.");
+	}
+
+	@Override
+	public void registerCachedFile(String filePath, String name, boolean executable) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, registerCachedFile method is unsupported.");
+	}
+
+	@Override
+	public StreamGraph getStreamGraph() {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, getStreamGraph method is unsupported.");
+	}
+
+	@Override
+	public StreamGraph getStreamGraph(String jobName) {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, getStreamGraph method is unsupported.");
+	}
+
+	@Override
+	public String getExecutionPlan() {
+		throw new UnsupportedOperationException(
+				"This is a dummy StreamExecutionEnvironment, getExecutionPlan method is unsupported.");
+	}
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
similarity index 66%
copy from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
copy to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index 7bf4367..610472b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -16,66 +16,66 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.delegation;
+package org.apache.flink.table.planner.utils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InputDependencyConstraint;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;
 
 import java.util.List;
 
 /**
- * An implementation of {@link Executor} that is backed by a {@link StreamExecutionEnvironment}.
- * This is the only executor that {@link org.apache.flink.table.planner.delegation.BatchPlanner} supports.
+ * Utility class to generate StreamGraph and set properties for batch.
  */
-@Internal
-public class BatchExecutor extends ExecutorBase {
+public class ExecutorUtils {
 
-	@VisibleForTesting
-	public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
-		super(executionEnvironment);
-	}
-
-	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		StreamGraph streamGraph = generateStreamGraph(jobName);
-		return execEnv.execute(streamGraph);
+	/**
+	 * Generate {@link StreamGraph} by {@link StreamGraphGenerator}.
+	 */
+	public static StreamGraph generateStreamGraph(
+			StreamExecutionEnvironment execEnv,
+			List<Transformation<?>> transformations) {
+		if (transformations.size() <= 0) {
+			throw new IllegalStateException("No operators defined in streaming topology. Cannot generate StreamGraph.");
+		}
+		StreamGraphGenerator generator =
+				new StreamGraphGenerator(transformations, execEnv.getConfig(), execEnv.getCheckpointConfig())
+						.setStateBackend(execEnv.getStateBackend())
+						.setChaining(execEnv.isChainingEnabled())
+						.setUserArtifacts(execEnv.getCachedFiles())
+						.setTimeCharacteristic(execEnv.getStreamTimeCharacteristic())
+						.setDefaultBufferTimeout(execEnv.getBufferTimeout());
+		return generator.generate();
 	}
 
 	/**
-	 * Sets batch configs.
+	 * Sets batch properties for {@link StreamExecutionEnvironment}.
 	 */
-	private void setBatchProperties(StreamExecutionEnvironment execEnv) {
+	public static void setBatchProperties(StreamExecutionEnvironment execEnv, TableConfig tableConfig) {
 		ExecutionConfig executionConfig = execEnv.getConfig();
 		executionConfig.enableObjectReuse();
 		executionConfig.setLatencyTrackingInterval(-1);
 		execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		execEnv.setBufferTimeout(-1);
-		if (isShuffleModeAllBatch()) {
+		if (isShuffleModeAllBatch(tableConfig)) {
 			executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
 		}
 	}
 
-	@Override
-	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
-		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		setBatchProperties(execEnv);
-		transformations.forEach(execEnv::addOperator);
-		StreamGraph streamGraph;
-		streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
+	/**
+	 * Sets batch properties for {@link StreamGraph}.
+	 */
+	public static void setBatchProperties(StreamGraph streamGraph, TableConfig tableConfig) {
 		// All transformations should set managed memory size.
 		ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
 		streamGraph.getStreamNodes().forEach(sn -> {
@@ -89,13 +89,12 @@ public class BatchExecutor extends ExecutorBase {
 		if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
 			throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
 		}
-		if (isShuffleModeAllBatch()) {
+		if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
 			streamGraph.setBlockingConnectionsBetweenChains(true);
 		}
-		return streamGraph;
 	}
 
-	private boolean isShuffleModeAllBatch() {
+	private static boolean isShuffleModeAllBatch(TableConfig tableConfig) {
 		String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE);
 		if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
 			return true;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 56a2fa2..a43d778 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
 import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
 import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor
 import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
-import org.apache.flink.table.planner.utils.PlanUtil
+import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
 import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
@@ -65,7 +65,6 @@ class BatchPlanner(
 
   override protected def translateToPlan(
       execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
-    overrideEnvParallelism()
     execNodes.map {
       case node: BatchExecNode[_] => node.translateToPlan(this)
       case _ =>
@@ -85,10 +84,15 @@ class BatchPlanner(
     }
     val optimizedRelNodes = optimize(sinkRelNodes)
     val execNodes = translateToExecNodePlan(optimizedRelNodes)
-    val transformations = translateToPlan(execNodes)
-    val batchExecutor = new BatchExecutor(getExecEnv)
-    batchExecutor.setTableConfig(getTableConfig)
-    val streamGraph = batchExecutor.generateStreamGraph(transformations, "")
+
+    val plannerForExplain = createDummyPlannerForExplain()
+    plannerForExplain.overrideEnvParallelism()
+    val transformations = plannerForExplain.translateToPlan(execNodes)
+
+    val execEnv = getExecEnv
+    ExecutorUtils.setBatchProperties(execEnv, getTableConfig)
+    val streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations)
+    ExecutorUtils.setBatchProperties(streamGraph, getTableConfig)
     val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
 
     val sb = new StringBuilder
@@ -114,4 +118,11 @@ class BatchPlanner(
     sb.append(executionPlan)
     sb.toString()
   }
+
+  private def createDummyPlannerForExplain(): BatchPlanner = {
+    val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
+    val executorForExplain = new BatchExecutor(dummyExecEnv)
+    new BatchPlanner(executorForExplain, config, functionCatalog, catalogManager)
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 9168d44..4632ba4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -142,7 +142,10 @@ abstract class PlannerBase(
     if (modifyOperations.isEmpty) {
       return List.empty[Transformation[_]]
     }
+    // prepare the execEnv before translating
     mergeParameters()
+    overrideEnvParallelism()
+
     val relNodes = modifyOperations.map(translateToRel)
     val optimizedRelNodes = optimize(relNodes)
     val execNodes = translateToExecNodePlan(optimizedRelNodes)
@@ -154,7 +157,7 @@ abstract class PlannerBase(
     val defaultParallelism = getTableConfig.getConfiguration.getInteger(
       ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
     if (defaultParallelism > 0) {
-      getExecEnv.setParallelism(defaultParallelism)
+      getExecEnv.getConfig.setParallelism(defaultParallelism)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index e5add45..a690f29 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.`trait`._
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
 import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
-import org.apache.flink.table.planner.utils.PlanUtil
+import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
 import org.apache.calcite.sql.SqlExplainLevel
@@ -56,7 +56,6 @@ class StreamPlanner(
 
   override protected def translateToPlan(
       execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
-    overrideEnvParallelism()
     execNodes.map {
       case node: StreamExecNode[_] => node.translateToPlan(this)
       case _ =>
@@ -76,10 +75,11 @@ class StreamPlanner(
     }
     val optimizedRelNodes = optimize(sinkRelNodes)
     val execNodes = translateToExecNodePlan(optimizedRelNodes)
-    val transformations = translateToPlan(execNodes)
-    val streamExecutor = new StreamExecutor(getExecEnv)
-    streamExecutor.setTableConfig(getTableConfig)
-    val streamGraph = streamExecutor.generateStreamGraph(transformations, "")
+
+    val plannerForExplain = createDummyPlannerForExplain()
+    plannerForExplain.overrideEnvParallelism()
+    val transformations = plannerForExplain.translateToPlan(execNodes)
+    val streamGraph = ExecutorUtils.generateStreamGraph(getExecEnv, transformations)
     val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
 
     val sb = new StringBuilder
@@ -108,4 +108,10 @@ class StreamPlanner(
     sb.append(executionPlan)
     sb.toString()
   }
+
+  private def createDummyPlannerForExplain(): StreamPlanner = {
+    val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
+    val executor = new StreamExecutor(dummyExecEnv)
+    new StreamPlanner(executor, config, functionCatalog, catalogManager)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..a4155df
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.Types.STRING
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSources}
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.util.FileUtils
+
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Rule, Test}
+
+import _root_.java.io.File
+import _root_.java.util
+
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(settings: EnvironmentSettings, mode: String) {
+
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  @Test
+  def testExecuteTwiceUsingSameTableEnv(): Unit = {
+    testExecuteTwiceUsingSameTableEnv(TableEnvironmentImpl.create(settings))
+  }
+
+  @Test
+  def testExecuteTwiceUsingSameStreamTableEnv(): Unit = {
+    if (settings.isStreamingMode) {
+      testExecuteTwiceUsingSameTableEnv(StreamTableEnvironment.create(
+        StreamExecutionEnvironment.getExecutionEnvironment, settings))
+    } else {
+      // batch planner is not supported on StreamTableEnvironment
+    }
+  }
+
+  private def testExecuteTwiceUsingSameTableEnv(tEnv: TableEnvironment): Unit = {
+    val tEnv = TableEnvironmentImpl.create(settings)
+    tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+    val sink1Path = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+    val sink2Path = registerCsvTableSink(tEnv, Array("last"), Array(STRING), "MySink2")
+    checkEmptyFile(sink1Path)
+    checkEmptyFile(sink2Path)
+
+    val table1 = tEnv.sqlQuery("select first from MyTable")
+    tEnv.insertInto(table1, "MySink1")
+    tEnv.execute("test1")
+    assertFirstValues(sink1Path)
+    checkEmptyFile(sink2Path)
+
+    // delete first csv file
+    new File(sink1Path).delete()
+    assertFalse(new File(sink1Path).exists())
+
+    val table2 = tEnv.sqlQuery("select last from MyTable")
+    tEnv.insertInto(table2, "MySink2")
+    tEnv.execute("test2")
+    assertFalse(new File(sink1Path).exists())
+    assertLastValues(sink2Path)
+  }
+
+  @Test
+  def testExplainAndExecuteSingleSink(): Unit = {
+    val tEnv = TableEnvironmentImpl.create(settings)
+    tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+    val sinkPath = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+
+    val table1 = tEnv.sqlQuery("select first from MyTable")
+    tEnv.insertInto(table1, "MySink1")
+
+    tEnv.explain(false)
+    tEnv.execute("test1")
+    assertFirstValues(sinkPath)
+  }
+
+  @Test
+  def testExplainAndExecuteMultipleSink(): Unit = {
+    val tEnv = TableEnvironmentImpl.create(settings)
+    tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+    val sink1Path = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+    val sink2Path = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink2")
+
+    val table1 = tEnv.sqlQuery("select first from MyTable")
+    tEnv.insertInto(table1, "MySink1")
+    val table2 = tEnv.sqlQuery("select last from MyTable")
+    tEnv.insertInto(table2, "MySink2")
+
+    tEnv.explain(false)
+    tEnv.execute("test1")
+    assertFirstValues(sink1Path)
+    assertLastValues(sink2Path)
+  }
+
+  @Test
+  def testExplainTwice(): Unit = {
+    val tEnv = TableEnvironmentImpl.create(settings)
+    tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+    registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+    registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink2")
+
+    val table1 = tEnv.sqlQuery("select first from MyTable")
+    tEnv.insertInto(table1, "MySink1")
+    val table2 = tEnv.sqlQuery("select last from MyTable")
+    tEnv.insertInto(table2, "MySink2")
+
+    val result1 = tEnv.explain(false)
+    val result2 = tEnv.explain(false)
+    assertEquals(TableTestUtil.replaceStageId(result1), TableTestUtil.replaceStageId(result2))
+  }
+
+  private def registerCsvTableSink(
+      tEnv: TableEnvironment,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableName: String): String = {
+    val resultFile = _tempFolder.newFile()
+    val path = resultFile.getAbsolutePath
+
+    val configuredSink = new CsvTableSink(path, ",", 1, WriteMode.OVERWRITE)
+      .configure(fieldNames, fieldTypes)
+    tEnv.registerTableSink(tableName, configuredSink)
+
+    path
+  }
+
+  private def assertFirstValues(csvFilePath: String): Unit = {
+    val expected = List("Mike", "Bob", "Sam", "Peter", "Liz", "Sally", "Alice", "Kelly")
+    val actual = FileUtils.readFileUtf8(new File(csvFilePath)).split("\n").toList
+    assertEquals(expected.sorted, actual.sorted)
+  }
+
+  private def assertLastValues(csvFilePath: String): Unit = {
+    val expected = List(
+      "Smith", "Taylor", "Miller", "Smith", "Williams", "Miller", "Smith", "Williams")
+    val actual = FileUtils.readFileUtf8(new File(csvFilePath)).split("\n").toList
+    assertEquals(expected.sorted, actual.sorted)
+  }
+
+  private def checkEmptyFile(csvFilePath: String): Unit = {
+    assertTrue(FileUtils.readFileUtf8(new File(csvFilePath)).isEmpty)
+  }
+}
+
+object TableEnvironmentITCase {
+  @Parameterized.Parameters(name = "{1}")
+  def parameters(): util.Collection[Array[_]] = {
+    util.Arrays.asList(
+      Array(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), "batch"),
+      Array(EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(), "stream")
+    )
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 1a0996b..583a205 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.api
 
+import org.apache.flink.api.common.typeinfo.Types.STRING
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
-import org.apache.flink.table.planner.utils.TableTestUtil
+import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSources}
+import org.apache.flink.table.sinks.CsvTableSink
 
 import org.apache.calcite.plan.RelOptUtil
 import org.junit.Assert.assertEquals
@@ -75,4 +77,25 @@ class TableEnvironmentTest {
       "  LogicalTableScan(table=[[default_catalog, default_database, MyTable]])\n"
     assertEquals(expected, actual)
   }
+
+  @Test
+  def testStreamTableEnvironmentExplain(): Unit = {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage(
+      "'explain' method without any tables is unsupported in StreamTableEnvironment.")
+
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+    val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+    tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+    tEnv.registerTableSink("MySink",
+      new CsvTableSink("/tmp").configure(Array("first"), Array(STRING)))
+
+    val table1 = tEnv.sqlQuery("select first from MyTable")
+    tEnv.insertInto(table1, "MySink")
+
+    tEnv.explain(false)
+  }
+
 }