You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/11/25 02:43:38 UTC
[flink] branch release-1.9 updated: [FLINK-13708]
[table-planner-blink] transformations should be cleared after execution in
blink planner (#10290)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 2dd3ba8 [FLINK-13708] [table-planner-blink] transformations should be cleared after execution in blink planner (#10290)
2dd3ba8 is described below
commit 2dd3ba8335d35de7ed59073def4f0093326f1464
Author: godfrey he <go...@163.com>
AuthorDate: Mon Nov 25 10:43:25 2019 +0800
[FLINK-13708] [table-planner-blink] transformations should be cleared after execution in blink planner (#10290)
---
.../table/tests/test_table_environment_api.py | 25 +-
.../client/gateway/local/ExecutionContext.java | 2 +-
.../java/internal/StreamTableEnvironmentImpl.java | 6 +
.../internal/StreamTableEnvironmentImpl.scala | 6 +
.../table/planner/delegation/BatchExecutor.java | 67 +----
.../table/planner/delegation/ExecutorBase.java | 19 +-
.../table/planner/delegation/StreamExecutor.java | 14 +-
.../utils/DummyStreamExecutionEnvironment.java | 295 +++++++++++++++++++++
.../ExecutorUtils.java} | 63 +++--
.../table/planner/delegation/BatchPlanner.scala | 23 +-
.../table/planner/delegation/PlannerBase.scala | 5 +-
.../table/planner/delegation/StreamPlanner.scala | 18 +-
.../flink/table/api/TableEnvironmentITCase.scala | 179 +++++++++++++
.../flink/table/api/TableEnvironmentTest.scala | 25 +-
14 files changed, 615 insertions(+), 132 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 8bbd491..8c657a3 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -132,9 +132,8 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
- actual = t_env.explain(extended=True)
-
- assert isinstance(actual, str) or isinstance(actual, unicode)
+ with self.assertRaises(TableException):
+ t_env.explain(extended=True)
def test_sql_query(self):
t_env = self.t_env
@@ -363,3 +362,23 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
line = f.readline()
self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
+
+ def test_explain_with_multi_sinks_with_blink_planner(self):
+ t_env = BatchTableEnvironment.create(
+ environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
+ .use_blink_planner().build())
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sink1",
+ CsvTableSink(field_names, field_types, "path1"))
+ t_env.register_table_sink(
+ "sink2",
+ CsvTableSink(field_names, field_types, "path2"))
+
+ t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
+ t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
+
+ actual = t_env.explain(extended=True)
+ self.assertIsInstance(actual, (str, unicode))
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 27c3ee2..e212f05 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -454,7 +454,7 @@ public class ExecutionContext<T> {
// special case for Blink planner to apply batch optimizations
// note: it also modifies the ExecutionConfig!
if (executor instanceof ExecutorBase) {
- return ((ExecutorBase) executor).generateStreamGraph(name);
+ return ((ExecutorBase) executor).getStreamGraph(name);
}
return streamExecEnv.getStreamGraph(name);
} else {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index cf2ace7..f343d4f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -335,6 +335,12 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
return true;
}
+ @Override
+ public String explain(boolean extended) {
+ // throw exception directly, because the operations to explain are always empty
+ throw new TableException("'explain' method without any tables is unsupported in StreamTableEnvironment.");
+ }
+
private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
try {
return TypeExtractor.createTypeInfo(clazz);
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c45d324..052551f 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -193,6 +193,12 @@ class StreamTableEnvironmentImpl (
override protected def isEagerOperationTranslation(): Boolean = true
+ override def explain(extended: Boolean): String = {
+ // throw exception directly, because the operations to explain are always empty
+ throw new TableException(
+ "'explain' method without any tables is unsupported in StreamTableEnvironment.")
+ }
+
private def toDataStream[T](
table: Table,
modifyOperation: OutputConversionModifyOperation)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
index 7bf4367..2334e93 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
@@ -20,21 +20,11 @@ package org.apache.flink.table.planner.delegation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;
-
-import java.util.List;
+import org.apache.flink.table.planner.utils.ExecutorUtils;
/**
* An implementation of {@link Executor} that is backed by a {@link StreamExecutionEnvironment}.
@@ -50,59 +40,18 @@ public class BatchExecutor extends ExecutorBase {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- StreamGraph streamGraph = generateStreamGraph(jobName);
- return execEnv.execute(streamGraph);
- }
-
- /**
- * Sets batch configs.
- */
- private void setBatchProperties(StreamExecutionEnvironment execEnv) {
- ExecutionConfig executionConfig = execEnv.getConfig();
- executionConfig.enableObjectReuse();
- executionConfig.setLatencyTrackingInterval(-1);
- execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- execEnv.setBufferTimeout(-1);
- if (isShuffleModeAllBatch()) {
- executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
- }
+ StreamGraph streamGraph = getStreamGraph(jobName);
+ return getExecutionEnvironment().execute(streamGraph);
}
@Override
- public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
+ public StreamGraph getStreamGraph(String jobName) {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- setBatchProperties(execEnv);
- transformations.forEach(execEnv::addOperator);
- StreamGraph streamGraph;
- streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
- // All transformations should set managed memory size.
- ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
- streamGraph.getStreamNodes().forEach(sn -> {
- if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
- sn.setResources(managedResourceSpec, managedResourceSpec);
- }
- });
- streamGraph.setChaining(true);
- streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
- streamGraph.setStateBackend(null);
- if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
- throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
- }
- if (isShuffleModeAllBatch()) {
- streamGraph.setBlockingConnectionsBetweenChains(true);
- }
+ ExecutorUtils.setBatchProperties(execEnv, tableConfig);
+ StreamGraph streamGraph = execEnv.getStreamGraph();
+ streamGraph.setJobName(getNonEmptyJobName(jobName));
+ ExecutorUtils.setBatchProperties(streamGraph, tableConfig);
return streamGraph;
}
- private boolean isShuffleModeAllBatch() {
- String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE);
- if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
- return true;
- } else if (!value.equalsIgnoreCase(ShuffleMode.PIPELINED.toString())) {
- throw new IllegalArgumentException(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.key() +
- " can only be set to " + ShuffleMode.BATCH.toString() + " or " + ShuffleMode.PIPELINED.toString());
- }
- return false;
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
index 10eeafd..af12c22 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.util.StringUtils;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -38,7 +37,6 @@ public abstract class ExecutorBase implements Executor {
private static final String DEFAULT_JOB_NAME = "Flink Exec Table Job";
private final StreamExecutionEnvironment executionEnvironment;
- protected List<Transformation<?>> transformations = new ArrayList<>();
protected TableConfig tableConfig;
public ExecutorBase(StreamExecutionEnvironment executionEnvironment) {
@@ -49,26 +47,19 @@ public abstract class ExecutorBase implements Executor {
this.tableConfig = tableConfig;
}
- @Override
- public void apply(List<Transformation<?>> transformations) {
- this.transformations.addAll(transformations);
- }
-
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
- /**
- * Translates the applied transformations to a stream graph.
- */
- public StreamGraph generateStreamGraph(String jobName) {
- return generateStreamGraph(transformations, jobName);
+ @Override
+ public void apply(List<Transformation<?>> transformations) {
+ transformations.forEach(getExecutionEnvironment()::addOperator);
}
/**
- * Translates the given transformations to a stream graph.
+ * Translates the transformations applied into this executor to a stream graph.
*/
- public abstract StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName);
+ public abstract StreamGraph getStreamGraph(String jobName);
protected String getNonEmptyJobName(String jobName) {
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
index 8d1e904..034da4d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
@@ -21,13 +21,10 @@ package org.apache.flink.table.planner.delegation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.delegation.Executor;
-import java.util.List;
-
/**
* An implementation of {@link Executor} that is backed by a {@link StreamExecutionEnvironment}.
* This is the only executor that {@link org.apache.flink.table.planner.delegation.StreamPlanner} supports.
@@ -42,14 +39,13 @@ public class StreamExecutor extends ExecutorBase {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- return execEnv.execute(generateStreamGraph(transformations, jobName));
+ return getExecutionEnvironment().execute(getNonEmptyJobName(jobName));
}
@Override
- public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
- transformations.forEach(getExecutionEnvironment()::addOperator);
- return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
+ public StreamGraph getStreamGraph(String jobName) {
+ StreamGraph streamGraph = getExecutionEnvironment().getStreamGraph();
+ streamGraph.setJobName(getNonEmptyJobName(jobName));
+ return streamGraph;
}
-
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
new file mode 100644
index 0000000..23d70d2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.TableSource;
+
+import com.esotericsoftware.kryo.Serializer;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * This is dummy {@link StreamExecutionEnvironment}, only used for {@link PlannerBase#explain(List, boolean)} method.
+ *
+ * <P>{@link Transformation}s will be added into a {@link StreamExecutionEnvironment} when translating ExecNode to plan,
+ * and they will be cleared only when calling {@link StreamExecutionEnvironment#execute()} method.
+ *
+ * <p>{@link PlannerBase#explain(List, boolean)} method will not only print logical plan but also execution plan,
+ * translating will happen in explain method. If calling explain method before execute method, the transformations in
+ * StreamExecutionEnvironment is dirty, and execution result may be incorrect.
+ *
+ * <p>All set methods (e.g. `setXX`, `enableXX`, `disableXX`, etc) are disabled to prohibit changing configuration,
+ * all get methods (e.g. `getXX`, `isXX`, etc) will be delegated to real StreamExecutionEnvironment.
+ * `execute`, `getStreamGraph`, `getExecutionPlan` methods are also disabled, while `addOperator` method is enabled to
+ * let `explain` method add Transformations to this StreamExecutionEnvironment.
+ *
+ * <p>This class could be removed once the {@link TableSource} interface and {@link StreamTableSink} interface
+ * are reworked.
+ */
+public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+ private final StreamExecutionEnvironment realExecEnv;
+
+ public DummyStreamExecutionEnvironment(StreamExecutionEnvironment realExecEnv) {
+ this.realExecEnv = realExecEnv;
+ }
+
+ @Override
+ public ExecutionConfig getConfig() {
+ return realExecEnv.getConfig();
+ }
+
+ @Override
+ public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
+ return realExecEnv.getCachedFiles();
+ }
+
+ @Override
+ public StreamExecutionEnvironment setParallelism(int parallelism) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setParallelism method is unsupported.");
+ }
+
+ @Override
+ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setMaxParallelism method is unsupported.");
+ }
+
+ @Override
+ public int getParallelism() {
+ return realExecEnv.getParallelism();
+ }
+
+ @Override
+ public int getMaxParallelism() {
+ return realExecEnv.getMaxParallelism();
+ }
+
+ @Override
+ public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setBufferTimeout method is unsupported.");
+ }
+
+ @Override
+ public long getBufferTimeout() {
+ return realExecEnv.getBufferTimeout();
+ }
+
+ @Override
+ public StreamExecutionEnvironment disableOperatorChaining() {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, disableOperatorChaining method is unsupported.");
+ }
+
+ @Override
+ public boolean isChainingEnabled() {
+ return realExecEnv.isChainingEnabled();
+ }
+
+ @Override
+ public CheckpointConfig getCheckpointConfig() {
+ return realExecEnv.getCheckpointConfig();
+ }
+
+ @Override
+ public StreamExecutionEnvironment enableCheckpointing(long interval) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+ }
+
+ @Override
+ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+ }
+
+ @Override
+ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+ }
+
+ @Override
+ public StreamExecutionEnvironment enableCheckpointing() {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
+ }
+
+ @Override
+ public long getCheckpointInterval() {
+ return realExecEnv.getCheckpointInterval();
+ }
+
+ @Override
+ public boolean isForceCheckpointing() {
+ return realExecEnv.isForceCheckpointing();
+ }
+
+ @Override
+ public CheckpointingMode getCheckpointingMode() {
+ return realExecEnv.getCheckpointingMode();
+ }
+
+ @Override
+ public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setStateBackend method is unsupported.");
+ }
+
+ @Override
+ public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setStateBackend method is unsupported.");
+ }
+
+ @Override
+ public StateBackend getStateBackend() {
+ return realExecEnv.getStateBackend();
+ }
+
+ @Override
+ public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setRestartStrategy method is unsupported.");
+ }
+
+ @Override
+ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+ return realExecEnv.getRestartStrategy();
+ }
+
+ @Override
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setNumberOfExecutionRetries method is unsupported.");
+ }
+
+ @Override
+ public int getNumberOfExecutionRetries() {
+ return realExecEnv.getNumberOfExecutionRetries();
+ }
+
+ @Override
+ public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(Class<?> type, T serializer) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, addDefaultKryoSerializer method is unsupported.");
+ }
+
+ @Override
+ public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, addDefaultKryoSerializer method is unsupported.");
+ }
+
+ @Override
+ public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, registerTypeWithKryoSerializer method is unsupported.");
+ }
+
+ @Override
+ public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, registerTypeWithKryoSerializer method is unsupported.");
+ }
+
+ @Override
+ public void registerType(Class<?> type) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, registerType method is unsupported.");
+ }
+
+ @Override
+ public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, setStreamTimeCharacteristic method is unsupported.");
+ }
+
+ @Override
+ public TimeCharacteristic getStreamTimeCharacteristic() {
+ return realExecEnv.getStreamTimeCharacteristic();
+ }
+
+ @Override
+ public JobExecutionResult execute() throws Exception {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, execute method is unsupported.");
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, execute method is unsupported.");
+ }
+
+ @Override
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, execute method is unsupported.");
+ }
+
+ @Override
+ public void registerCachedFile(String filePath, String name) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, registerCachedFile method is unsupported.");
+ }
+
+ @Override
+ public void registerCachedFile(String filePath, String name, boolean executable) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, registerCachedFile method is unsupported.");
+ }
+
+ @Override
+ public StreamGraph getStreamGraph() {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, getStreamGraph method is unsupported.");
+ }
+
+ @Override
+ public StreamGraph getStreamGraph(String jobName) {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, getStreamGraph method is unsupported.");
+ }
+
+ @Override
+ public String getExecutionPlan() {
+ throw new UnsupportedOperationException(
+ "This is a dummy StreamExecutionEnvironment, getExecutionPlan method is unsupported.");
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
similarity index 66%
copy from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
copy to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index 7bf4367..610472b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -16,66 +16,66 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.delegation;
+package org.apache.flink.table.planner.utils;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;
import java.util.List;
/**
- * An implementation of {@link Executor} that is backed by a {@link StreamExecutionEnvironment}.
- * This is the only executor that {@link org.apache.flink.table.planner.delegation.BatchPlanner} supports.
+ * Utility class to generate StreamGraph and set properties for batch.
*/
-@Internal
-public class BatchExecutor extends ExecutorBase {
+public class ExecutorUtils {
- @VisibleForTesting
- public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
- super(executionEnvironment);
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- StreamGraph streamGraph = generateStreamGraph(jobName);
- return execEnv.execute(streamGraph);
+ /**
+ * Generate {@link StreamGraph} by {@link StreamGraphGenerator}.
+ */
+ public static StreamGraph generateStreamGraph(
+ StreamExecutionEnvironment execEnv,
+ List<Transformation<?>> transformations) {
+ if (transformations.size() <= 0) {
+ throw new IllegalStateException("No operators defined in streaming topology. Cannot generate StreamGraph.");
+ }
+ StreamGraphGenerator generator =
+ new StreamGraphGenerator(transformations, execEnv.getConfig(), execEnv.getCheckpointConfig())
+ .setStateBackend(execEnv.getStateBackend())
+ .setChaining(execEnv.isChainingEnabled())
+ .setUserArtifacts(execEnv.getCachedFiles())
+ .setTimeCharacteristic(execEnv.getStreamTimeCharacteristic())
+ .setDefaultBufferTimeout(execEnv.getBufferTimeout());
+ return generator.generate();
}
/**
- * Sets batch configs.
+ * Sets batch properties for {@link StreamExecutionEnvironment}.
*/
- private void setBatchProperties(StreamExecutionEnvironment execEnv) {
+ public static void setBatchProperties(StreamExecutionEnvironment execEnv, TableConfig tableConfig) {
ExecutionConfig executionConfig = execEnv.getConfig();
executionConfig.enableObjectReuse();
executionConfig.setLatencyTrackingInterval(-1);
execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
execEnv.setBufferTimeout(-1);
- if (isShuffleModeAllBatch()) {
+ if (isShuffleModeAllBatch(tableConfig)) {
executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
}
}
- @Override
- public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
- StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- setBatchProperties(execEnv);
- transformations.forEach(execEnv::addOperator);
- StreamGraph streamGraph;
- streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
+ /**
+ * Sets batch properties for {@link StreamGraph}.
+ */
+ public static void setBatchProperties(StreamGraph streamGraph, TableConfig tableConfig) {
// All transformations should set managed memory size.
ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
streamGraph.getStreamNodes().forEach(sn -> {
@@ -89,13 +89,12 @@ public class BatchExecutor extends ExecutorBase {
if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
}
- if (isShuffleModeAllBatch()) {
+ if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
streamGraph.setBlockingConnectionsBetweenChains(true);
}
- return streamGraph;
}
- private boolean isShuffleModeAllBatch() {
+ private static boolean isShuffleModeAllBatch(TableConfig tableConfig) {
String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE);
if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
return true;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 56a2fa2..a43d778 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor
import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
-import org.apache.flink.table.planner.utils.PlanUtil
+import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
@@ -65,7 +65,6 @@ class BatchPlanner(
override protected def translateToPlan(
execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
- overrideEnvParallelism()
execNodes.map {
case node: BatchExecNode[_] => node.translateToPlan(this)
case _ =>
@@ -85,10 +84,15 @@ class BatchPlanner(
}
val optimizedRelNodes = optimize(sinkRelNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
- val transformations = translateToPlan(execNodes)
- val batchExecutor = new BatchExecutor(getExecEnv)
- batchExecutor.setTableConfig(getTableConfig)
- val streamGraph = batchExecutor.generateStreamGraph(transformations, "")
+
+ val plannerForExplain = createDummyPlannerForExplain()
+ plannerForExplain.overrideEnvParallelism()
+ val transformations = plannerForExplain.translateToPlan(execNodes)
+
+ val execEnv = getExecEnv
+ ExecutorUtils.setBatchProperties(execEnv, getTableConfig)
+ val streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations)
+ ExecutorUtils.setBatchProperties(streamGraph, getTableConfig)
val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
val sb = new StringBuilder
@@ -114,4 +118,11 @@ class BatchPlanner(
sb.append(executionPlan)
sb.toString()
}
+
+ private def createDummyPlannerForExplain(): BatchPlanner = {
+ val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
+ val executorForExplain = new BatchExecutor(dummyExecEnv)
+ new BatchPlanner(executorForExplain, config, functionCatalog, catalogManager)
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 9168d44..4632ba4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -142,7 +142,10 @@ abstract class PlannerBase(
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
+ // prepare the execEnv before translating
mergeParameters()
+ overrideEnvParallelism()
+
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
@@ -154,7 +157,7 @@ abstract class PlannerBase(
val defaultParallelism = getTableConfig.getConfiguration.getInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
if (defaultParallelism > 0) {
- getExecEnv.setParallelism(defaultParallelism)
+ getExecEnv.getConfig.setParallelism(defaultParallelism)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index e5add45..a690f29 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil}
-import org.apache.flink.table.planner.utils.PlanUtil
+import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
import org.apache.calcite.sql.SqlExplainLevel
@@ -56,7 +56,6 @@ class StreamPlanner(
override protected def translateToPlan(
execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
- overrideEnvParallelism()
execNodes.map {
case node: StreamExecNode[_] => node.translateToPlan(this)
case _ =>
@@ -76,10 +75,11 @@ class StreamPlanner(
}
val optimizedRelNodes = optimize(sinkRelNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
- val transformations = translateToPlan(execNodes)
- val streamExecutor = new StreamExecutor(getExecEnv)
- streamExecutor.setTableConfig(getTableConfig)
- val streamGraph = streamExecutor.generateStreamGraph(transformations, "")
+
+ val plannerForExplain = createDummyPlannerForExplain()
+ plannerForExplain.overrideEnvParallelism()
+ val transformations = plannerForExplain.translateToPlan(execNodes)
+ val streamGraph = ExecutorUtils.generateStreamGraph(getExecEnv, transformations)
val executionPlan = PlanUtil.explainStreamGraph(streamGraph)
val sb = new StringBuilder
@@ -108,4 +108,10 @@ class StreamPlanner(
sb.append(executionPlan)
sb.toString()
}
+
+ private def createDummyPlannerForExplain(): StreamPlanner = {
+ val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
+ val executor = new StreamExecutor(dummyExecEnv)
+ new StreamPlanner(executor, config, functionCatalog, catalogManager)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..a4155df
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.Types.STRING
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSources}
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.util.FileUtils
+
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Rule, Test}
+
+import _root_.java.io.File
+import _root_.java.util
+
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(settings: EnvironmentSettings, mode: String) {
+
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder: TemporaryFolder = _tempFolder
+
+ @Test
+ def testExecuteTwiceUsingSameTableEnv(): Unit = {
+ testExecuteTwiceUsingSameTableEnv(TableEnvironmentImpl.create(settings))
+ }
+
+ @Test
+ def testExecuteTwiceUsingSameStreamTableEnv(): Unit = {
+ if (settings.isStreamingMode) {
+ testExecuteTwiceUsingSameTableEnv(StreamTableEnvironment.create(
+ StreamExecutionEnvironment.getExecutionEnvironment, settings))
+ } else {
+ // batch planner is not supported on StreamTableEnvironment
+ }
+ }
+
+ private def testExecuteTwiceUsingSameTableEnv(tEnv: TableEnvironment): Unit = {
+ val tEnv = TableEnvironmentImpl.create(settings)
+ tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+ val sink1Path = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+ val sink2Path = registerCsvTableSink(tEnv, Array("last"), Array(STRING), "MySink2")
+ checkEmptyFile(sink1Path)
+ checkEmptyFile(sink2Path)
+
+ val table1 = tEnv.sqlQuery("select first from MyTable")
+ tEnv.insertInto(table1, "MySink1")
+ tEnv.execute("test1")
+ assertFirstValues(sink1Path)
+ checkEmptyFile(sink2Path)
+
+ // delete first csv file
+ new File(sink1Path).delete()
+ assertFalse(new File(sink1Path).exists())
+
+ val table2 = tEnv.sqlQuery("select last from MyTable")
+ tEnv.insertInto(table2, "MySink2")
+ tEnv.execute("test2")
+ assertFalse(new File(sink1Path).exists())
+ assertLastValues(sink2Path)
+ }
+
+ @Test
+ def testExplainAndExecuteSingleSink(): Unit = {
+ val tEnv = TableEnvironmentImpl.create(settings)
+ tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+ val sinkPath = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+
+ val table1 = tEnv.sqlQuery("select first from MyTable")
+ tEnv.insertInto(table1, "MySink1")
+
+ tEnv.explain(false)
+ tEnv.execute("test1")
+ assertFirstValues(sinkPath)
+ }
+
+ @Test
+ def testExplainAndExecuteMultipleSink(): Unit = {
+ val tEnv = TableEnvironmentImpl.create(settings)
+ tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+ val sink1Path = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+ val sink2Path = registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink2")
+
+ val table1 = tEnv.sqlQuery("select first from MyTable")
+ tEnv.insertInto(table1, "MySink1")
+ val table2 = tEnv.sqlQuery("select last from MyTable")
+ tEnv.insertInto(table2, "MySink2")
+
+ tEnv.explain(false)
+ tEnv.execute("test1")
+ assertFirstValues(sink1Path)
+ assertLastValues(sink2Path)
+ }
+
+ @Test
+ def testExplainTwice(): Unit = {
+ val tEnv = TableEnvironmentImpl.create(settings)
+ tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+ registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink1")
+ registerCsvTableSink(tEnv, Array("first"), Array(STRING), "MySink2")
+
+ val table1 = tEnv.sqlQuery("select first from MyTable")
+ tEnv.insertInto(table1, "MySink1")
+ val table2 = tEnv.sqlQuery("select last from MyTable")
+ tEnv.insertInto(table2, "MySink2")
+
+ val result1 = tEnv.explain(false)
+ val result2 = tEnv.explain(false)
+ assertEquals(TableTestUtil.replaceStageId(result1), TableTestUtil.replaceStageId(result2))
+ }
+
+ private def registerCsvTableSink(
+ tEnv: TableEnvironment,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableName: String): String = {
+ val resultFile = _tempFolder.newFile()
+ val path = resultFile.getAbsolutePath
+
+ val configuredSink = new CsvTableSink(path, ",", 1, WriteMode.OVERWRITE)
+ .configure(fieldNames, fieldTypes)
+ tEnv.registerTableSink(tableName, configuredSink)
+
+ path
+ }
+
+ private def assertFirstValues(csvFilePath: String): Unit = {
+ val expected = List("Mike", "Bob", "Sam", "Peter", "Liz", "Sally", "Alice", "Kelly")
+ val actual = FileUtils.readFileUtf8(new File(csvFilePath)).split("\n").toList
+ assertEquals(expected.sorted, actual.sorted)
+ }
+
+ private def assertLastValues(csvFilePath: String): Unit = {
+ val expected = List(
+ "Smith", "Taylor", "Miller", "Smith", "Williams", "Miller", "Smith", "Williams")
+ val actual = FileUtils.readFileUtf8(new File(csvFilePath)).split("\n").toList
+ assertEquals(expected.sorted, actual.sorted)
+ }
+
+ private def checkEmptyFile(csvFilePath: String): Unit = {
+ assertTrue(FileUtils.readFileUtf8(new File(csvFilePath)).isEmpty)
+ }
+}
+
+object TableEnvironmentITCase {
+ @Parameterized.Parameters(name = "{1}")
+ def parameters(): util.Collection[Array[_]] = {
+ util.Arrays.asList(
+ Array(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), "batch"),
+ Array(EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(), "stream")
+ )
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 1a0996b..583a205 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,11 +18,13 @@
package org.apache.flink.table.api
+import org.apache.flink.api.common.typeinfo.Types.STRING
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
-import org.apache.flink.table.planner.utils.TableTestUtil
+import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSources}
+import org.apache.flink.table.sinks.CsvTableSink
import org.apache.calcite.plan.RelOptUtil
import org.junit.Assert.assertEquals
@@ -75,4 +77,25 @@ class TableEnvironmentTest {
" LogicalTableScan(table=[[default_catalog, default_database, MyTable]])\n"
assertEquals(expected, actual)
}
+
+ @Test
+ def testStreamTableEnvironmentExplain(): Unit = {
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage(
+ "'explain' method without any tables is unsupported in StreamTableEnvironment.")
+
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+ val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+ tEnv.registerTableSource("MyTable", TestTableSources.getPersonCsvTableSource)
+ tEnv.registerTableSink("MySink",
+ new CsvTableSink("/tmp").configure(Array("first"), Array(STRING)))
+
+ val table1 = tEnv.sqlQuery("select first from MyTable")
+ tEnv.insertInto(table1, "MySink")
+
+ tEnv.explain(false)
+ }
+
}