You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/13 11:45:20 UTC
[flink] branch master updated: [FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService (#19846)
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e494c91d1e [FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService (#19846)
7e494c91d1e is described below
commit 7e494c91d1e340a1f9438785e2a56242fb5894c8
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Wed Jul 13 19:45:11 2022 +0800
[FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService (#19846)
---
.../src/test/resources/sql_multi/statement_set.q | 16 -
.../flink/table/gateway/api/SqlGatewayService.java | 18 +
.../flink/table/gateway/api/results/ResultSet.java | 19 +-
.../gateway/api/utils/MockedSqlGatewayService.java | 11 +
flink-table/flink-sql-gateway/pom.xml | 19 +
.../gateway/service/SqlGatewayServiceImpl.java | 29 +
.../gateway/service/context/SessionContext.java | 77 +-
.../service/operation/OperationExecutor.java | 178 ++++
.../service/operation/OperationManager.java | 50 +-
.../gateway/service/result/ResultFetcher.java | 17 +
.../table/gateway/service/result/ResultStore.java | 7 +
.../table/gateway/service/session/Session.java | 6 +
.../table/gateway/service/utils/Constants.java | 27 +
.../gateway/service/SqlGatewayServiceITCase.java | 31 +
.../service/SqlGatewayServiceStatementITCase.java | 309 +++++++
.../service/context/SessionContextTest.java | 148 +++
.../gateway/service/result/ResultFetcherTest.java | 56 +-
.../service/utils/SqlGatewayServiceExtension.java | 24 +
.../gateway/service/utils/SqlScriptReader.java | 124 +++
.../gateway/service/utils/TestSqlStatement.java | 42 +
.../src/test/resources/sql/insert.q | 182 ++++
.../src/test/resources/sql}/statement_set.q | 213 +++--
.../src/test/resources/sql/table.q | 988 +++++++++++++++++++++
.../src/test/resources/sql/view.q | 503 +++++++++++
.../flink/test/junit5/MiniClusterExtension.java | 7 +
25 files changed, 2951 insertions(+), 150 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
index 3f5d19f043b..bb83b56ed20 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
@@ -1,19 +1,3 @@
-# table.q - EXECUTE/EXPLAIN STATEMENT SET BEGIN ... END
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
# statement-set.q - BEGIN STATEMENT SET, END
#
# Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index c0bdf01c36e..7e92610fb6e 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.gateway.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.OperationInfo;
@@ -111,6 +112,23 @@ public interface SqlGatewayService {
// Statements
// -------------------------------------------------------------------------------------------
+ /**
+ * Execute the submitted statement.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param statement the SQL to execute.
+ * @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the
+ * timeout mechanism.
+ * @param executionConfig execution config for the statement.
+ * @return handle to identify the operation.
+ */
+ OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ Configuration executionConfig)
+ throws SqlGatewayException;
+
/**
* Fetch the results from the operation. When maxRows is Integer.MAX_VALUE, it means to fetch
* all available data.
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
index 3750a1cee95..c11581eeffc 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.gateway.api.results;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
@@ -72,7 +73,23 @@ public class ResultSet {
return nextToken;
}
- /** The schema of the data. */
+ /**
+ * The schema of the data.
+ *
+ * <p>The schema of the DDL, USE, EXPLAIN, SHOW and DESCRIBE align with the schema of the {@link
+ * TableResult#getResolvedSchema()}. The only differences is the schema of the `INSERT`
+ * statement.
+ *
+ * <p>The schema of INSERT:
+ *
+ * <pre>
+ * +-------------+-------------+----------+
+ * | column name | column type | comments |
+ * +-------------+-------------+----------+
+ * | job id | string | |
+ * +- -----------+-------------+----------+
+ * </pre>
+ */
public ResolvedSchema getResultSchema() {
return resultSchema;
}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 656f353ae44..f962e9c727e 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.gateway.api.utils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -78,4 +79,14 @@ public class MockedSqlGatewayService implements SqlGatewayService {
SessionHandle sessionHandle, OperationHandle operationHandle) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ Configuration executionConfig)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
index 9c217e71e17..7b46860bed1 100644
--- a/flink-table/flink-sql-gateway/pom.xml
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -72,6 +72,25 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 39e253c020b..fb68828aedf 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.gateway.service;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -123,6 +124,34 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
}
}
+ @Override
+ public OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ Configuration executionConfig)
+ throws SqlGatewayException {
+ try {
+ if (executionTimeoutMs > 0) {
+ // TODO: support the feature in FLINK-27838
+ throw new UnsupportedOperationException(
+ "SqlGatewayService doesn't support timeout mechanism now.");
+ }
+
+ return getSession(sessionHandle)
+ .getOperationManager()
+ .submitOperation(
+ OperationType.EXECUTE_STATEMENT,
+ handle ->
+ getSession(sessionHandle)
+ .createExecutor(executionConfig)
+ .executeStatement(handle, statement));
+ } catch (Throwable t) {
+ LOG.error("Failed to execute statement.", t);
+ throw new SqlGatewayException("Failed to execute statement.", t);
+ }
+ }
+
@Override
public ResultSet fetchResults(
SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index b2e3921ba07..1361e7b15ab 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.gateway.service.context;
import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -27,6 +29,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -37,7 +40,9 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
import org.slf4j.Logger;
@@ -63,6 +68,7 @@ public class SessionContext {
private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class);
+ private final DefaultContext defaultContext;
private final SessionHandle sessionId;
private final EndpointVersion endpointVersion;
@@ -74,12 +80,14 @@ public class SessionContext {
private final OperationManager operationManager;
private SessionContext(
+ DefaultContext defaultContext,
SessionHandle sessionId,
EndpointVersion endpointVersion,
Configuration sessionConf,
URLClassLoader classLoader,
SessionState sessionState,
OperationManager operationManager) {
+ this.defaultContext = defaultContext;
this.sessionId = sessionId;
this.endpointVersion = endpointVersion;
this.sessionConf = sessionConf;
@@ -88,8 +96,22 @@ public class SessionContext {
this.operationManager = operationManager;
}
+ /** Close resources, e.g. catalogs. */
+ public void close() {
+ for (String name : sessionState.catalogManager.listCatalogs()) {
+ sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);
+ }
+ try {
+ userClassloader.close();
+ } catch (IOException e) {
+ LOG.debug("Error while closing class loader.", e);
+ }
+
+ operationManager.close();
+ }
+
// --------------------------------------------------------------------------------------------
- // Getter method
+ // Getter/Setter
// --------------------------------------------------------------------------------------------
public SessionHandle getSessionId() {
@@ -100,22 +122,43 @@ public class SessionContext {
return sessionConf.toMap();
}
- // --------------------------------------------------------------------------------------------
- // Method to execute commands
- // --------------------------------------------------------------------------------------------
+ public void set(String key, String value) {
+ try {
+ // Test whether the key value will influence the creation of the Executor.
+ createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)));
+ } catch (Exception e) {
+ // get error and reset the key with old value
+ throw new SqlExecutionException(
+ String.format("Failed to set key %s with value %s.", key, value), e);
+ }
+ sessionConf.setString(key, value);
+ }
- /** Close resources, e.g. catalogs. */
- public void close() {
- for (String name : sessionState.catalogManager.listCatalogs()) {
- sessionState.catalogManager.unregisterCatalog(name, true);
+ public synchronized void reset(String key) {
+ Configuration configuration = defaultContext.getFlinkConfig();
+ // If the key exist in default yaml, reset to default
+ ConfigOption<String> option = ConfigOptions.key(key).stringType().noDefaultValue();
+ if (configuration.contains(option)) {
+ String defaultValue = configuration.get(option);
+ set(key, defaultValue);
+ } else {
+ sessionConf.removeConfig(option);
}
- try {
- userClassloader.close();
- } catch (IOException e) {
- LOG.debug("Error while closing class loader.", e);
+ }
+
+ public synchronized void reset() {
+ for (String key : sessionConf.keySet()) {
+ sessionConf.removeConfig(ConfigOptions.key(key).stringType().noDefaultValue());
}
+ sessionConf.addAll(defaultContext.getFlinkConfig());
+ }
- operationManager.close();
+ // --------------------------------------------------------------------------------------------
+ // Method to execute commands
+ // --------------------------------------------------------------------------------------------
+
+ public OperationExecutor createOperationExecutor(Configuration executionConfig) {
+ return new OperationExecutor(this, executionConfig);
}
// --------------------------------------------------------------------------------------------
@@ -168,6 +211,7 @@ public class SessionContext {
new SessionState(catalogManager, moduleManager, functionCatalog);
return new SessionContext(
+ defaultContext,
sessionId,
endpointVersion,
configuration,
@@ -201,11 +245,12 @@ public class SessionContext {
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().withConfiguration(sessionConf).build();
- TableConfig tableConfig = new TableConfig();
- tableConfig.addConfiguration(sessionConf);
-
StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();
+ TableConfig tableConfig = TableConfig.getDefault();
+ tableConfig.setRootConfiguration(defaultContext.getFlinkConfig());
+ tableConfig.addConfiguration(sessionConf);
+
final Executor executor = lookupExecutor(streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
new file mode 100644
index 00000000000..bf9bc7cf1e3
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
+import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
+import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+ private final SessionContext sessionContext;
+ private final Configuration executionConfig;
+
+ @VisibleForTesting
+ public OperationExecutor(SessionContext context, Configuration executionConfig) {
+ this.sessionContext = context;
+ this.executionConfig = executionConfig;
+ }
+
+ public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+ // Instantiate the TableEnvironment lazily
+ TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+ tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+ List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+ if (parsedOperations.size() > 1) {
+ throw new UnsupportedOperationException(
+ "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+ + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+ }
+ Operation op = parsedOperations.get(0);
+ if (op instanceof SetOperation) {
+ return callSetOperation(tableEnv, handle, (SetOperation) op);
+ } else if (op instanceof ResetOperation) {
+ return callResetOperation(handle, (ResetOperation) op);
+ } else if (op instanceof BeginStatementSetOperation) {
+ // TODO: support statement set in the FLINK-27837
+ throw new UnsupportedOperationException();
+ } else if (op instanceof EndStatementSetOperation) {
+ // TODO: support statement set in the FLINK-27837
+ throw new UnsupportedOperationException();
+ } else if (op instanceof ModifyOperation) {
+ return callModifyOperations(
+ tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+ } else if (op instanceof StatementSetOperation) {
+ return callModifyOperations(
+ tableEnv, handle, ((StatementSetOperation) op).getOperations());
+ } else if (op instanceof QueryOperation) {
+ TableResultInternal result = tableEnv.executeInternal(op);
+ return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+ } else {
+ TableResultInternal result = tableEnv.executeInternal(op);
+ return new ResultFetcher(
+ handle, result.getResolvedSchema(), collect(result.collectInternal()));
+ }
+ }
+
+ private ResultFetcher callSetOperation(
+ TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+ if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+ // set a property
+ sessionContext.set(setOp.getKey().get().trim(), setOp.getValue().get().trim());
+ return new ResultFetcher(
+ handle,
+ TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+ collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+ } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+ // show all properties
+ Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
+ return new ResultFetcher(
+ handle,
+ ResolvedSchema.of(
+ Column.physical(SET_KEY, DataTypes.STRING()),
+ Column.physical(SET_VALUE, DataTypes.STRING())),
+ collect(
+ configMap.entrySet().stream()
+ .map(
+ entry ->
+ GenericRowData.of(
+ StringData.fromString(entry.getKey()),
+ StringData.fromString(
+ entry.getValue())))
+ .map(RowData.class::cast)
+ .iterator()));
+ } else {
+ // impossible
+ throw new SqlExecutionException("Illegal SetOperation: " + setOp.asSummaryString());
+ }
+ }
+
+ private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation resetOp) {
+ if (resetOp.getKey().isPresent()) {
+ // reset a property
+ sessionContext.reset(resetOp.getKey().get().trim());
+ } else {
+ // reset all properties
+ sessionContext.reset();
+ }
+ return new ResultFetcher(
+ handle,
+ TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+ collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+ }
+
+ private ResultFetcher callModifyOperations(
+ TableEnvironmentInternal tableEnv,
+ OperationHandle handle,
+ List<ModifyOperation> modifyOperations) {
+ TableResultInternal result = tableEnv.executeInternal(modifyOperations);
+ return new ResultFetcher(
+ handle,
+ ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
+ Collections.singletonList(
+ GenericRowData.of(
+ StringData.fromString(
+ result.getJobClient()
+ .orElseThrow(
+ () ->
+ new SqlExecutionException(
+ String.format(
+ "Can't get job client for the operation %s.",
+ handle)))
+ .getJobID()
+ .toString()))));
+ }
+
+ private List<RowData> collect(Iterator<RowData> tableResult) {
+ List<RowData> rows = new ArrayList<>();
+ tableResult.forEachRemaining(rows::add);
+ return rows;
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 878674a0e72..f0aeb8c2af5 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -18,29 +18,29 @@
package org.apache.flink.table.gateway.service.operation;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationType;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
-import org.apache.flink.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import java.util.function.Supplier;
/** Manager for the {@link Operation}. */
+@Internal
public class OperationManager {
private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class);
@@ -58,9 +58,9 @@ public class OperationManager {
}
/**
- * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manges the
- * lifecycle of the {@link Operation}, including register resources, fire the execution and so
- * on.
+ * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manages
+ * the lifecycle of the {@link Operation}, including register resources, fire the execution and
+ * so on.
*
* @param operationType The type of the submitted operation.
* @param executor Worker to execute.
@@ -75,19 +75,29 @@ public class OperationManager {
operationType,
() -> {
ResultSet resultSet = executor.call();
- List<RowData> rows = resultSet.getData();
return new ResultFetcher(
- handle,
- resultSet.getResultSchema(),
- CloseableIterator.adapterForIterator(rows.iterator()),
- rows.size());
+ handle, resultSet.getResultSchema(), resultSet.getData());
});
- writeLock(
- () -> {
- submittedOperations.put(handle, operation);
- operation.run(service);
- });
+ submitOperationInternal(handle, operation);
+ return handle;
+ }
+
+ /**
+ * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manges the
+ * lifecycle of the {@link Operation}, including register resources, fire the execution and so
+ * on.
+ *
+ * @param operationType The type of the submitted operation.
+ * @param fetcherSupplier offer the fetcher to get the results.
+ * @return OperationHandle to fetch the results or check the status.
+ */
+ public OperationHandle submitOperation(
+ OperationType operationType, Function<OperationHandle, ResultFetcher> fetcherSupplier) {
+ OperationHandle handle = OperationHandle.create();
+ Operation operation =
+ new Operation(handle, operationType, () -> fetcherSupplier.apply(handle));
+ submitOperationInternal(handle, operation);
return handle;
}
@@ -173,6 +183,14 @@ public class OperationManager {
});
}
+ private void submitOperationInternal(OperationHandle handle, Operation operation) {
+ writeLock(
+ () -> {
+ submittedOperations.put(handle, operation);
+ operation.run(service);
+ });
+ }
+
private void writeLock(Runnable runner) {
lock.writeLock().lock();
try {
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index 145639552dd..81bb11f55a2 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -47,6 +47,7 @@ import java.util.Optional;
public class ResultFetcher {
private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class);
+ private static final int TABLE_RESULT_MAX_INITIAL_CAPACITY = 5000;
private final OperationHandle operationHandle;
@@ -59,6 +60,14 @@ public class ResultFetcher {
private boolean noMoreResults = false;
public ResultFetcher(
+ OperationHandle operationHandle,
+ ResolvedSchema resultSchema,
+ CloseableIterator<RowData> resultRows) {
+ this(operationHandle, resultSchema, resultRows, TABLE_RESULT_MAX_INITIAL_CAPACITY);
+ }
+
+ @VisibleForTesting
+ ResultFetcher(
OperationHandle operationHandle,
ResolvedSchema resultSchema,
CloseableIterator<RowData> resultRows,
@@ -68,6 +77,14 @@ public class ResultFetcher {
this.resultStore = new ResultStore(resultRows, maxBufferSize);
}
+ public ResultFetcher(
+ OperationHandle operationHandle, ResolvedSchema resultSchema, List<RowData> rows) {
+ this.operationHandle = operationHandle;
+ this.resultSchema = resultSchema;
+ this.bufferedResults.addAll(rows);
+ this.resultStore = ResultStore.DUMMY_RESULT_STORE;
+ }
+
public void close() {
resultStore.close();
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
index 180da8f9e59..edc180536ff 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
@@ -37,6 +37,13 @@ public class ResultStore {
private static final Logger LOG = LoggerFactory.getLogger(ResultStore.class);
+ public static final ResultStore DUMMY_RESULT_STORE =
+ new ResultStore(CloseableIterator.adapterForIterator(Collections.emptyIterator()), 0);
+
+ static {
+ DUMMY_RESULT_STORE.close();
+ }
+
private final CloseableIterator<RowData> result;
private final List<RowData> recordsBuffer = new ArrayList<>();
private final int maxBufferSize;
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
index 2369aea247e..f2f4ff0f38f 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
@@ -18,8 +18,10 @@
package org.apache.flink.table.gateway.service.session;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import java.io.Closeable;
@@ -62,6 +64,10 @@ public class Session implements Closeable {
return sessionContext.getOperationManager();
}
+ public OperationExecutor createExecutor(Configuration executionConfig) {
+ return sessionContext.createOperationExecutor(executionConfig);
+ }
+
@Override
public void close() {
sessionContext.close();
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
new file mode 100644
index 00000000000..7991c6797af
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/** Constants used in the SqlGatewayService. */
+public class Constants {
+
+ public static final String JOB_ID = "job id";
+ public static final String SET_KEY = "key";
+ public static final String SET_VALUE = "value";
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index d39c797eb63..c05e6b10167 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.gateway.service;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
@@ -54,6 +55,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -68,6 +70,7 @@ import static org.apache.flink.types.RowKind.INSERT;
import static org.apache.flink.types.RowKind.UPDATE_AFTER;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -247,6 +250,34 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
assertEquals(0, sessionManager.getOperationCount(sessionHandle));
}
+ @Test
+ public void testExecuteSqlWithConfig() {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+ String key = "username";
+ String value = "Flink";
+ OperationHandle operationHandle =
+ service.executeStatement(
+ sessionHandle,
+ "SET",
+ -1,
+ Configuration.fromMap(Collections.singletonMap(key, value)));
+
+ Long token = 0L;
+ List<RowData> settings = new ArrayList<>();
+ while (token != null) {
+ ResultSet result =
+ service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
+ settings.addAll(result.getData());
+ token = result.getNextToken();
+ }
+
+ assertThat(
+ settings,
+ hasItem(
+ GenericRowData.of(
+ StringData.fromString(key), StringData.fromString(value))));
+ }
+
// --------------------------------------------------------------------------------------------
// Concurrent tests
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
new file mode 100644
index 00000000000..40a3ec44f34
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.gateway.service.utils.SqlScriptReader;
+import org.apache.flink.table.gateway.service.utils.TestSqlStatement;
+import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
+
+import org.apache.calcite.util.Util;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.gateway.service.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Test {@link SqlGatewayService}#executeStatement. */
+@Execution(CONCURRENT)
+public class SqlGatewayServiceStatementITCase {
+
+ @RegisterExtension
+ @Order(1)
+ public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+ @RegisterExtension
+ @Order(2)
+ public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+ new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+ private static SqlGatewayService service;
+
+ private final SessionEnvironment defaultSessionEnvironment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .build();
+ private final Map<String, String> replaceVars = new HashMap<>();
+
+ @BeforeAll
+ public static void setUp() {
+ service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
+ }
+
+ @BeforeEach
+ public void before(@TempDir Path temporaryFolder) throws IOException {
+ // initialize new folders for every test, so the vars can be reused by every SQL scripts
+ replaceVars.put(
+ "$VAR_STREAMING_PATH",
+ Files.createDirectory(temporaryFolder.resolve("streaming")).toFile().getPath());
+ replaceVars.put(
+ "$VAR_BATCH_PATH",
+ Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
+ }
+
+ public static Stream<String> parameters() throws Exception {
+ String first = "sql/table.q";
+ URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + first);
+ File firstFile = Paths.get(checkNotNull(url.toURI())).toFile();
+ final int commonPrefixLength = firstFile.getAbsolutePath().length() - first.length();
+ File dir = firstFile.getParentFile();
+ final List<String> paths = new ArrayList<>();
+ final FilenameFilter filter = new PatternFilenameFilter(".*\\.q$");
+ for (File f : Util.first(dir.listFiles(filter), new File[0])) {
+ paths.add(f.getAbsolutePath().substring(commonPrefixLength));
+ }
+ return paths.stream();
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testSqlStatements(String sqlPath) throws Exception {
+ String in = getInputFromPath(sqlPath);
+ List<TestSqlStatement> testSqlStatements = SqlScriptReader.parseSqlScript(in);
+
+ assertThat(String.join("", runStatements(testSqlStatements))).isEqualTo(in);
+ }
+
+ /**
+ * Returns printed results for each ran SQL statements.
+ *
+ * @param statements the SQL statements to run
+ * @return the stringified results
+ */
+ private List<String> runStatements(List<TestSqlStatement> statements) {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+ List<String> output = new ArrayList<>();
+ for (TestSqlStatement statement : statements) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(statement.getComment());
+ builder.append(statement.getSql());
+
+ try {
+ builder.append(runSingleStatement(sessionHandle, statement.getSql()));
+ } catch (Throwable t) {
+ Throwable root = getRootCause(t);
+ builder.append(
+ Tag.ERROR.addTag(
+ root.getClass().getName()
+ + ": "
+ + root.getMessage().trim()
+ + "\n"));
+ }
+ output.add(builder.toString());
+ }
+
+ return output;
+ }
+
+ // -------------------------------------------------------------------------------------------
+ // Utility
+ // -------------------------------------------------------------------------------------------
+
+ /** Mark the output type. */
+ enum Tag {
+ INFO("!info"),
+
+ OK("!ok"),
+
+ ERROR("!error");
+
+ private final String tag;
+
+ Tag(String tag) {
+ this.tag = tag;
+ }
+
+ public String addTag(String content) {
+ return HINT_START_OF_OUTPUT + "\n" + content + tag + "\n";
+ }
+ }
+
+ private String getInputFromPath(String sqlPath) throws IOException {
+ URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + sqlPath);
+
+ // replace the placeholder with specified value if exists
+ String[] keys = replaceVars.keySet().toArray(new String[0]);
+ String[] values = Arrays.stream(keys).map(replaceVars::get).toArray(String[]::new);
+
+ return StringUtils.replaceEach(
+ IOUtils.toString(checkNotNull(url), StandardCharsets.UTF_8), keys, values);
+ }
+
+ protected static Throwable getRootCause(Throwable e) {
+ Throwable root = e;
+ while (root.getCause() != null) {
+ root = root.getCause();
+ }
+ return root;
+ }
+
+ /**
+ * Returns printed results for each ran SQL statements.
+ *
+ * @param sessionHandle the Session that run the statement
+ * @param statement the SQL statement to run
+ * @return the printed results in tableau style
+ */
+ private String runSingleStatement(SessionHandle sessionHandle, String statement)
+ throws Exception {
+ OperationHandle operationHandle =
+ service.executeStatement(sessionHandle, statement, -1, new Configuration());
+ CommonTestUtils.waitUtil(
+ () ->
+ service.getOperationInfo(sessionHandle, operationHandle)
+ .getStatus()
+ .isTerminalStatus(),
+ Duration.ofSeconds(100),
+ "Failed to wait operation finish.");
+
+ if (!service.getOperationInfo(sessionHandle, operationHandle).isHasResults()) {
+ return Tag.INFO.addTag("");
+ }
+
+ // The content in the result of the `explain` and `show create` statement is large, so it's
+ // more straightforward to just print the content without the table.
+ if (statement.toUpperCase().startsWith("EXPLAIN")
+ || statement.toUpperCase().startsWith("SHOW CREATE")) {
+ ResultSet resultSet =
+ service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE);
+ return Tag.OK.addTag(
+ replaceStreamNodeId(
+ replaceNodeIdInOperator(
+ resultSet.getData().get(0).getString(0).toString()))
+ + "\n");
+ } else {
+ ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ ResultSet resultSet =
+ service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE);
+
+ boolean isStreaming =
+ Configuration.fromMap(service.getSessionConfig(sessionHandle))
+ .get(RUNTIME_MODE)
+ .equals(STREAMING);
+ boolean isQuery = statement.toUpperCase().startsWith("SELECT");
+
+ PrintStyle style =
+ PrintStyle.tableauWithDataInferredColumnWidths(
+ resultSet.getResultSchema(),
+ new RowDataToStringConverterImpl(
+ resultSet.getResultSchema().toPhysicalRowDataType(),
+ DateTimeUtils.UTC_ZONE.toZoneId(),
+ SqlGatewayServiceStatementITCase.class.getClassLoader(),
+ false),
+ Integer.MAX_VALUE,
+ true,
+ isStreaming && isQuery);
+
+ PrintWriter writer = new PrintWriter(outContent);
+ style.print(new RowDataIterator(sessionHandle, operationHandle), writer);
+ return Tag.OK.addTag(outContent.toString());
+ }
+ }
+
+ private static class RowDataIterator implements Iterator<RowData> {
+
+ private final SessionHandle sessionHandle;
+ private final OperationHandle operationHandle;
+
+ private Long token = 0L;
+ private Iterator<RowData> fetchedRows = Collections.emptyIterator();
+
+ public RowDataIterator(SessionHandle sessionHandle, OperationHandle operationHandle) {
+ this.sessionHandle = sessionHandle;
+ this.operationHandle = operationHandle;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (token != null && !fetchedRows.hasNext()) {
+ ResultSet resultSet =
+ service.fetchResults(
+ sessionHandle, operationHandle, token, Integer.MAX_VALUE);
+ token = resultSet.getNextToken();
+ fetchedRows = resultSet.getData().iterator();
+ }
+
+ return token != null;
+ }
+
+ @Override
+ public RowData next() {
+ return fetchedRows.next();
+ }
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
new file mode 100644
index 00000000000..6d78e3474d4
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.ThreadUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link SessionContext}. */
+class SessionContextTest {
+
+ private static final ExecutorService EXECUTOR_SERVICE =
+ ThreadUtils.newThreadPool(5, 500, 60_0000, "session-context-test");
+ private SessionContext sessionContext;
+
+ @BeforeEach
+ public void setup() {
+ sessionContext = createSessionContext();
+ }
+
+ @AfterAll
+ public static void cleanUp() {
+ EXECUTOR_SERVICE.shutdown();
+ }
+
+ @Test
+ public void testSetAndResetOption() {
+ // table config option
+ sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
+ // runtime config option
+ sessionContext.set(MAX_PARALLELISM.key(), "128");
+ // runtime config option and doesn't have default value
+ sessionContext.set(NAME.key(), "test");
+ // runtime config from flink-conf
+ sessionContext.set(OBJECT_REUSE.key(), "false");
+ assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
+ assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(128);
+ assertThat(getConfiguration().get(NAME)).isEqualTo("test");
+ assertThat(getConfiguration().get(OBJECT_REUSE)).isFalse();
+
+ sessionContext.reset();
+ assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("default");
+ assertThat(getConfiguration().get(NAME)).isNull();
+ // The value of MAX_PARALLELISM in DEFAULTS_ENVIRONMENT_FILE is 16
+ assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(16);
+ assertThat(getConfiguration().getOptional(NAME)).isEmpty();
+ // The value of OBJECT_REUSE in origin configuration is true
+ assertThat(getConfiguration().get(OBJECT_REUSE)).isTrue();
+ }
+
+ @Test
+ public void testSetAndResetKeyInConfigOptions() {
+ // table config option
+ sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
+ // runtime config option
+ sessionContext.set(MAX_PARALLELISM.key(), "128");
+ // runtime config option and doesn't have default value
+ sessionContext.set(NAME.key(), "test");
+ // runtime config from flink-conf
+ sessionContext.set(OBJECT_REUSE.key(), "false");
+
+ assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
+ assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(128);
+ assertThat(getConfiguration().get(NAME)).isEqualTo("test");
+ assertThat(getConfiguration().get(OBJECT_REUSE)).isFalse();
+
+ sessionContext.reset(TABLE_SQL_DIALECT.key());
+ assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("default");
+
+ sessionContext.reset(MAX_PARALLELISM.key());
+ assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(16);
+
+ sessionContext.reset(NAME.key());
+ assertThat(getConfiguration().get(NAME)).isNull();
+
+ sessionContext.reset(OBJECT_REUSE.key());
+ assertThat(getConfiguration().get(OBJECT_REUSE)).isTrue();
+ }
+
+ @Test
+ public void testSetAndResetArbitraryKey() {
+ // other property not in flink-conf
+ sessionContext.set("aa", "11");
+ sessionContext.set("bb", "22");
+
+ assertThat(sessionContext.getConfigMap().get("aa")).isEqualTo("11");
+ assertThat(sessionContext.getConfigMap().get("bb")).isEqualTo("22");
+
+ sessionContext.reset("aa");
+ assertThat(sessionContext.getConfigMap().get("aa")).isNull();
+ assertThat(sessionContext.getConfigMap().get("bb")).isEqualTo("22");
+
+ sessionContext.reset("bb");
+ assertThat(sessionContext.getConfigMap().get("bb")).isNull();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private SessionContext createSessionContext() {
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.set(OBJECT_REUSE, true);
+ flinkConfig.set(MAX_PARALLELISM, 16);
+ DefaultContext defaultContext =
+ new DefaultContext(flinkConfig, Collections.singletonList(new DefaultCLI()));
+ return SessionContext.create(
+ defaultContext,
+ SessionHandle.create(),
+ MockedEndpointVersion.V1,
+ flinkConfig,
+ EXECUTOR_SERVICE);
+ }
+
+ private ReadableConfig getConfiguration() {
+ return Configuration.fromMap(sessionContext.getConfigMap());
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index dd079ced141..59a128c5cca 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -172,35 +172,18 @@ public class ResultFetcherTest extends TestLogger {
@Test
public void testFetchResultInParallel() throws Exception {
- int bufferSize = data.size() / 2;
ResultFetcher fetcher =
- buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize);
-
- AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
- int fetchThreadNum = 100;
- CountDownLatch latch = new CountDownLatch(fetchThreadNum);
-
+ buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
CommonTestUtils.waitUtil(
() -> fetcher.getResultStore().getBufferedRecordSize() > 0,
Duration.ofSeconds(10),
"Failed to wait the buffer has data.");
- List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
- for (int i = 0; i < fetchThreadNum; i++) {
- threadFactory
- .newThread(
- () -> {
- ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
-
- if (!firstFetch.equals(resultSet.getData())) {
- isEqual.set(false);
- }
- latch.countDown();
- })
- .start();
- }
+ checkFetchResultInParallel(fetcher);
+ }
- latch.await();
- assertEquals(true, isEqual.get());
+ @Test
+ public void testFetchResultFromDummyStoreInParallel() throws Exception {
+ checkFetchResultInParallel(new ResultFetcher(OperationHandle.create(), schema, data));
}
@Test
@@ -221,7 +204,8 @@ public class ResultFetcherTest extends TestLogger {
long testToken = token;
AtomicReference<Boolean> meetEnd = new AtomicReference<>(false);
- new Thread(
+ threadFactory
+ .newThread(
() -> {
// Should meet EOS in the end.
long nextToken = testToken;
@@ -349,6 +333,30 @@ public class ResultFetcherTest extends TestLogger {
assertEquals(data, fetchedRows);
}
+ private void checkFetchResultInParallel(ResultFetcher fetcher) throws Exception {
+ AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
+ int fetchThreadNum = 100;
+ CountDownLatch latch = new CountDownLatch(fetchThreadNum);
+
+ List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
+ for (int i = 0; i < fetchThreadNum; i++) {
+ threadFactory
+ .newThread(
+ () -> {
+ ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
+
+ if (!firstFetch.equals(resultSet.getData())) {
+ isEqual.set(false);
+ }
+ latch.countDown();
+ })
+ .start();
+ }
+
+ latch.await();
+ assertEquals(true, isEqual.get());
+ }
+
// --------------------------------------------------------------------------------------------
private static class ErrorIterator implements Iterator<RowData> {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
index 0e5fbcec5ba..db214e0ca6b 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
@@ -18,12 +18,14 @@
package org.apache.flink.table.gateway.service.utils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.Extension;
@@ -32,8 +34,10 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Supplier;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
@@ -43,6 +47,15 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa
private SqlGatewayService service;
private SessionManager sessionManager;
private TemporaryFolder temporaryFolder;
+ private final Supplier<Configuration> configSupplier;
+
+ public SqlGatewayServiceExtension() {
+ this(Configuration::new);
+ }
+
+ public SqlGatewayServiceExtension(Supplier<Configuration> configSupplier) {
+ this.configSupplier = configSupplier;
+ }
@Override
public void beforeAll(ExtensionContext context) throws Exception {
@@ -57,6 +70,11 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa
throw new IOException("Can't create testing flink-conf.yaml file.");
}
+ FileUtils.write(
+ confYaml,
+ getFlinkConfContent(configSupplier.get().toMap()),
+ StandardCharsets.UTF_8);
+
// adjust the test environment for the purposes of this test
Map<String, String> map = new HashMap<>(System.getenv());
map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
@@ -86,4 +104,10 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa
public SessionManager getSessionManager() {
return sessionManager;
}
+
+ private String getFlinkConfContent(Map<String, String> flinkConf) {
+ StringBuilder sb = new StringBuilder();
+ flinkConf.forEach((k, v) -> sb.append(k).append(": ").append(v).append("\n"));
+ return sb.toString();
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlScriptReader.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlScriptReader.java
new file mode 100644
index 00000000000..81a3adb55f3
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlScriptReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A utility to read and parse content of a SQL script. The SQL script is located in "resources/sql"
+ * path and in the "xx.q" file name pattern. The SQL script is executed and tested by {@link
+ * SqlGatewayServiceStatementITCase}.
+ */
+public final class SqlScriptReader implements AutoCloseable {
+ public static final String HINT_START_OF_OUTPUT = "!output";
+ private final BufferedReader reader;
+ private String currentLine;
+
+ public static List<TestSqlStatement> parseSqlScript(String in) {
+ try (SqlScriptReader sqlReader = new SqlScriptReader(in)) {
+ return sqlReader.parseSqlScript();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private SqlScriptReader(String input) {
+ this.reader = new BufferedReader(new StringReader(input));
+ }
+
+ private List<TestSqlStatement> parseSqlScript() throws IOException {
+ List<TestSqlStatement> specs = new ArrayList<>();
+ TestSqlStatement spec;
+ while ((spec = readNext()) != null) {
+ specs.add(spec);
+ }
+ return specs;
+ }
+
+ private void readLine() throws IOException {
+ this.currentLine = reader.readLine();
+ }
+
+ private @Nullable TestSqlStatement readNext() throws IOException {
+ StringBuilder commentLines = new StringBuilder();
+ StringBuilder sqlLines = new StringBuilder();
+ ReadingStatus status = ReadingStatus.BEGINNING;
+ readLine();
+ while (currentLine != null) {
+ switch (status) {
+ case BEGINNING:
+ if (currentLine.startsWith("#") || currentLine.trim().length() == 0) {
+ commentLines.append(currentLine).append("\n");
+ // continue reading if not reach SQL statement
+ readLine();
+ } else {
+ // if current currentLine is not comment and empty currentLine, begin to
+ // read SQL
+ status = ReadingStatus.SQL_STATEMENT;
+ }
+ break;
+
+ case SQL_STATEMENT:
+ if (currentLine.trim().equals(HINT_START_OF_OUTPUT)) {
+ // SQL statement is finished, begin to read result content
+ status = ReadingStatus.RESULT_CONTENT;
+ } else {
+ sqlLines.append(currentLine).append("\n");
+ }
+ // continue reading
+ readLine();
+ break;
+
+ case RESULT_CONTENT:
+ if (!currentLine.startsWith("!")) {
+ // continuously consume if not reaching result flag
+ readLine();
+ } else {
+ // reach result flag and return
+ status = ReadingStatus.FINISH;
+ }
+ break;
+
+ case FINISH:
+ return new TestSqlStatement(commentLines.toString(), sqlLines.toString());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ reader.close();
+ }
+
+ private enum ReadingStatus {
+ BEGINNING,
+ SQL_STATEMENT,
+ RESULT_CONTENT,
+ FINISH
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/TestSqlStatement.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/TestSqlStatement.java
new file mode 100644
index 00000000000..9f3535b80a5
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/TestSqlStatement.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/**
+ * A structure describes a SQL statement for testing.
+ *
+ * @see SqlScriptReader
+ */
+public class TestSqlStatement {
+ private final String comment;
+ private final String sql;
+
+ public TestSqlStatement(String comment, String sql) {
+ this.comment = comment;
+ this.sql = sql;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
new file mode 100644
index 00000000000..2348b1f57a0
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -0,0 +1,182 @@
+# insert.q - insert
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+SET 'table.dml-sync' = 'true';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test streaming insert
+# ==========================================================================
+
+SET 'execution.runtime-mode' = 'streaming';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+create table StreamingTable (
+ id int,
+ str string
+) with (
+ 'connector' = 'filesystem',
+ 'path' = '$VAR_STREAMING_PATH',
+ 'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = 'e68e7fabddfade4f42910980652582dc';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+!output
++----------------------------------+
+| job id |
++----------------------------------+
+| e68e7fabddfade4f42910980652582dc |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM StreamingTable;
+!output
++----+----+-------------+
+| op | id | str |
++----+----+-------------+
+| +I | 1 | Hello World |
+| +I | 2 | Hi |
+| +I | 2 | Hi |
+| +I | 3 | Hello |
+| +I | 3 | World |
+| +I | 4 | ADD |
+| +I | 5 | LINE |
++----+----+-------------+
+7 rows in set
+!ok
+
+# ==========================================================================
+# test batch insert
+# ==========================================================================
+
+SET 'execution.runtime-mode' = 'batch';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+create table BatchTable (
+ id int,
+ str string
+) with (
+ 'connector' = 'filesystem',
+ 'path' = '$VAR_BATCH_PATH',
+ 'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = '29ba2263b9b86bd8a14b91487941bfe7';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+!output
++----------------------------------+
+| job id |
++----------------------------------+
+| 29ba2263b9b86bd8a14b91487941bfe7 |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM BatchTable;
+!output
++----+-------------+
+| id | str |
++----+-------------+
+| 1 | Hello World |
+| 2 | Hi |
+| 2 | Hi |
+| 3 | Hello |
+| 3 | World |
+| 4 | ADD |
+| 5 | LINE |
++----+-------------+
+7 rows in set
+!ok
diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
similarity index 73%
copy from flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
copy to flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index 3f5d19f043b..9f95973c831 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -1,19 +1,3 @@
-# table.q - EXECUTE/EXPLAIN STATEMENT SET BEGIN ... END
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
# statement-set.q - BEGIN STATEMENT SET, END
#
# Licensed to the Apache Software Foundation (ASF) under one or more
@@ -31,15 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SET 'sql-client.execution.result-mode' = 'tableau';
-!output
-[INFO] Session property has been set.
-!info
-
SET 'table.dml-sync' = 'true';
!output
-[INFO] Session property has been set.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
create table src (
id int,
@@ -48,8 +32,13 @@ create table src (
'connector' = 'values'
);
!output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
# ==========================================================================
# test statement set with streaming insert
@@ -57,8 +46,13 @@ create table src (
SET 'execution.runtime-mode' = 'streaming';
!output
-[INFO] Session property has been set.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
create table StreamingTable (
id int,
@@ -69,8 +63,14 @@ create table StreamingTable (
'format' = 'csv'
);
!output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
create table StreamingTable2 (
id int,
@@ -81,8 +81,13 @@ create table StreamingTable2 (
'format' = 'csv'
);
!output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
EXPLAIN STATEMENT SET BEGIN
INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
@@ -113,7 +118,17 @@ Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
+- Reused(reference_id=[1])
+!ok
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = 'a5513ca0a886c6c9bafaf3acac43bfa5';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
!ok
EXECUTE STATEMENT SET BEGIN
@@ -121,63 +136,72 @@ INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'),
INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
END;
!output
-[INFO] Submitting SQL update statement to the cluster...
-[INFO] Execute statement in sync mode. Please wait for the execution finish...
-[INFO] Complete execution of the SQL update statement.
-!info
++----------------------------------+
+| job id |
++----------------------------------+
+| a5513ca0a886c6c9bafaf3acac43bfa5 |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
SELECT * FROM StreamingTable;
!output
-+----+-------------+--------------------------------+
-| op | id | str |
-+----+-------------+--------------------------------+
-| +I | 1 | Hello World |
-| +I | 2 | Hi |
-| +I | 2 | Hi |
-| +I | 3 | Hello |
-| +I | 3 | World |
-| +I | 4 | ADD |
-| +I | 5 | LINE |
-+----+-------------+--------------------------------+
-Received a total of 7 rows
++----+----+-------------+
+| op | id | str |
++----+----+-------------+
+| +I | 1 | Hello World |
+| +I | 2 | Hi |
+| +I | 2 | Hi |
+| +I | 3 | Hello |
+| +I | 3 | World |
+| +I | 4 | ADD |
+| +I | 5 | LINE |
++----+----+-------------+
+7 rows in set
!ok
SELECT * FROM StreamingTable2;
!output
-+----+-------------+--------------------------------+
-| op | id | str |
-+----+-------------+--------------------------------+
-| +I | 1 | Hello World |
-| +I | 2 | Hi |
-| +I | 2 | Hi |
-| +I | 3 | Hello |
-| +I | 3 | World |
-| +I | 4 | ADD |
-| +I | 5 | LINE |
-+----+-------------+--------------------------------+
-Received a total of 7 rows
++----+----+-------------+
+| op | id | str |
++----+----+-------------+
+| +I | 1 | Hello World |
+| +I | 2 | Hi |
+| +I | 2 | Hi |
+| +I | 3 | Hello |
+| +I | 3 | World |
+| +I | 4 | ADD |
+| +I | 5 | LINE |
++----+----+-------------+
+7 rows in set
!ok
EXPLAIN STATEMENT SET BEGIN
END;
!output
-[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
Was expecting one of:
"INSERT" ...
"UPSERT" ...
-
!error
EXECUTE STATEMENT SET BEGIN
END;
!output
-[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
Was expecting one of:
"INSERT" ...
"UPSERT" ...
-
!error
# ==========================================================================
@@ -186,8 +210,13 @@ Was expecting one of:
SET 'execution.runtime-mode' = 'batch';
!output
-[INFO] Session property has been set.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
create table BatchTable (
id int,
@@ -198,8 +227,14 @@ str string
'format' = 'csv'
);
!output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
create table BatchTable2 (
id int,
@@ -210,8 +245,13 @@ str string
'format' = 'csv'
);
!output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
EXPLAIN STATEMENT SET
BEGIN
@@ -243,7 +283,17 @@ Sink(table=[default_catalog.default_database.BatchTable], fields=[EXPR$0, EXPR$1
Sink(table=[default_catalog.default_database.BatchTable2], fields=[EXPR$0, EXPR$1])
+- Reused(reference_id=[1])
+!ok
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = '2e2dc0a5a6315296062ba81eba340668';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
!ok
EXECUTE STATEMENT SET
@@ -252,10 +302,23 @@ INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2,
INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
END;
!output
-[INFO] Submitting SQL update statement to the cluster...
-[INFO] Execute statement in sync mode. Please wait for the execution finish...
-[INFO] Complete execution of the SQL update statement.
-!info
++----------------------------------+
+| job id |
++----------------------------------+
+| 2e2dc0a5a6315296062ba81eba340668 |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
SELECT * FROM BatchTable;
!output
@@ -288,21 +351,17 @@ Empty set
EXPLAIN STATEMENT SET BEGIN
END;
!output
-[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
Was expecting one of:
"INSERT" ...
"UPSERT" ...
-
!error
EXECUTE STATEMENT SET BEGIN
END;
!output
-[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
Was expecting one of:
"INSERT" ...
"UPSERT" ...
-
!error
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
new file mode 100644
index 00000000000..821f679fa36
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
@@ -0,0 +1,988 @@
+# table.q - CREATE/DROP/SHOW/ALTER/DESCRIBE TABLE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==========================================================================
+# validation test
+# ==========================================================================
+
+create table tbl(a int, b as invalid_function());
+!output
+org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature invalid_function()
+!error
+
+drop table non_exist;
+!output
+org.apache.flink.table.api.ValidationException: Table with identifier 'default_catalog.default_database.non_exist' does not exist.
+!error
+
+describe non_exist;
+!output
+org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
+!error
+
+desc non_exist;
+!output
+org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
+!error
+
+alter table non_exist rename to non_exist2;
+!output
+org.apache.flink.table.api.ValidationException: Table `default_catalog`.`default_database`.`non_exist` doesn't exist or is a temporary table.
+!error
+
+# ==========================================================================
+# test create table
+# ==========================================================================
+
+# test create a table with computed column, primary key, watermark
+CREATE TABLE IF NOT EXISTS orders (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ ptime AS PROCTIME(),
+ PRIMARY KEY(`user`) NOT ENFORCED,
+ WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test SHOW TABLES
+show tables;
+!output
++------------+
+| table name |
++------------+
+| orders |
++------------+
+1 row in set
+!ok
+
+# test SHOW CREATE TABLE
+show create table orders;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders` (
+ `user` BIGINT NOT NULL,
+ `product` VARCHAR(32),
+ `amount` INT,
+ `ts` TIMESTAMP(3),
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+ CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+ 'connector' = 'datagen'
+)
+!ok
+
+# test SHOW COLUMNS
+show columns from orders;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns in orders;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns from orders like '%u';
+!output
+Empty set
+!ok
+
+show columns in orders like '%u';
+!output
+Empty set
+!ok
+
+show columns from orders not like '%u';
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns in orders not like '%u';
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns from orders like '%r';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) | | |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns in orders like '%r';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) | | |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns from orders not like '%r';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+show columns in orders not like '%r';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+show columns from orders like '%u%';
+!output
++---------+-------------+-------+-----------+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-------------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
++---------+-------------+-------+-----------+--------+-----------+
+3 rows in set
+!ok
+
+show columns in orders like '%u%';
+!output
++---------+-------------+-------+-----------+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-------------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
++---------+-------------+-------+-----------+--------+-----------+
+3 rows in set
+!ok
+
+show columns from orders not like '%u%';
+!output
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+2 rows in set
+!ok
+
+show columns in orders not like '%u%';
+!output
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+2 rows in set
+!ok
+
+show columns from orders like 'use_';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) | | |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns in orders like 'use_';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) | | |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns from orders not like 'use_';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+show columns in orders not like 'use_';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+# ==========================================================================
+# test alter table rename
+# ==========================================================================
+
+alter table orders rename to orders2;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test alter table set
+# ==========================================================================
+
+# test alter table properties
+alter table orders2 set ('connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset');
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `user` BIGINT NOT NULL,
+ `product` VARCHAR(32),
+ `amount` INT,
+ `ts` TIMESTAMP(3),
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+ CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+ 'connector' = 'kafka',
+ 'scan.startup.mode' = 'earliest-offset'
+)
+!ok
+
+# change connector to 'datagen' without removing 'scan.startup.mode' for the fix later
+alter table orders2 set ('connector' = 'datagen');
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# verify table options are problematic
+show create table orders2;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `user` BIGINT NOT NULL,
+ `product` VARCHAR(32),
+ `amount` INT,
+ `ts` TIMESTAMP(3),
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+ CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+ 'connector' = 'datagen',
+ 'scan.startup.mode' = 'earliest-offset'
+)
+!ok
+
+# test SHOW CREATE VIEW for tables
+show create view orders2;
+!output
+org.apache.flink.table.api.TableException: SHOW CREATE VIEW is only supported for views, but `default_catalog`.`default_database`.`orders2` is a table. Please use SHOW CREATE TABLE instead.
+!error
+
+# test explain plan to verify the table source cannot be created
+explain plan for select * from orders2;
+!output
+org.apache.flink.table.api.ValidationException: Unsupported options found for 'datagen'.
+
+Unsupported options:
+
+scan.startup.mode
+
+Supported options:
+
+connector
+fields.amount.kind
+fields.amount.max
+fields.amount.min
+fields.product.kind
+fields.product.length
+fields.ts.kind
+fields.ts.max-past
+fields.user.kind
+fields.user.max
+fields.user.min
+number-of-rows
+rows-per-second
+!error
+
+# ==========================================================================
+# test alter table reset
+# ==========================================================================
+
+# test alter table reset to remove invalid key
+alter table orders2 reset ('scan.startup.mode');
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `user` BIGINT NOT NULL,
+ `product` VARCHAR(32),
+ `amount` INT,
+ `ts` TIMESTAMP(3),
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+ CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+ 'connector' = 'datagen'
+)
+!ok
+
+# test alter table reset emtpy key
+alter table orders2 reset ();
+!output
+org.apache.flink.table.api.ValidationException: ALTER TABLE RESET does not support empty key
+!error
+
+# ==========================================================================
+# test describe table
+# ==========================================================================
+
+describe orders2;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+# test desc table
+desc orders2;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+| user | BIGINT | FALSE | PRI(user) | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+# ==========================================================================
+# test drop table
+# ==========================================================================
+
+drop table orders2;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# verify table is dropped
+show tables;
+!output
+Empty set
+!ok
+
+# ==========================================================================
+# test temporary table
+# ==========================================================================
+
+create temporary table tbl1 (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# TODO: warning users the table already exists
+create temporary table if not exists tbl1 (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# list permanent and temporary tables together
+show tables;
+!output
++------------+
+| table name |
++------------+
+| tbl1 |
++------------+
+1 row in set
+!ok
+
+# SHOW CREATE TABLE for temporary table
+show create table tbl1;
+!output
+CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`tbl1` (
+ `user` BIGINT NOT NULL,
+ `product` VARCHAR(32),
+ `amount` INT
+) WITH (
+ 'connector' = 'datagen'
+)
+!ok
+
+drop temporary table tbl1;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test playing with keyword identifiers
+# ==========================================================================
+
+create table `mod` (`table` string, `database` string) with ('connector' = 'values');
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+describe `mod`;
+!output
++----------+--------+------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++----------+--------+------+-----+--------+-----------+
+| table | STRING | TRUE | | | |
+| database | STRING | TRUE | | | |
++----------+--------+------+-----+--------+-----------+
+2 rows in set
+!ok
+
+desc `mod`;
+!output
++----------+--------+------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++----------+--------+------+-----+--------+-----------+
+| table | STRING | TRUE | | | |
+| database | STRING | TRUE | | | |
++----------+--------+------+-----+--------+-----------+
+2 rows in set
+!ok
+
+drop table `mod`;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+# ==========================================================================
+# test explain
+# ==========================================================================
+
+CREATE TABLE IF NOT EXISTS orders (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ ptime AS PROCTIME(),
+ PRIMARY KEY(`user`) NOT ENFORCED,
+ WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+CREATE TABLE IF NOT EXISTS orders2 (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ PRIMARY KEY(`user`) NOT ENFORCED
+) with (
+ 'connector' = 'blackhole'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test explain plan for select
+explain plan for select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain plan for insert
+explain plan for insert into orders2 select `user`, product, amount, ts from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+ +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain select
+explain select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain insert
+explain insert into orders2 select `user`, product, amount, ts from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+ +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain insert with json format
+explain json_execution_plan insert into orders2 select `user`, product, amount, ts from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+ +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: orders[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "WatermarkAssigner[]",
+ "pact" : "Operator",
+ "contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "ConstraintEnforcer[]",
+ "pact" : "Operator",
+ "contents" : "[]:ConstraintEnforcer[NotNullEnforcer(fields=[user])]",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Sink: orders2[]",
+ "pact" : "Data Sink",
+ "contents" : "[]:Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
+!ok
+
+# test explain select with json format
+explain json_execution_plan select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: orders[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[user, product, ts])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "WatermarkAssigner[]",
+ "pact" : "Operator",
+ "contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[user, product])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
+!ok
+
+# test explain select with ESTIMATED_COST
+explain estimated_cost select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+ +- Calc(select=[user, product, ts]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain select with CHANGELOG_MODE
+explain changelog_mode select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product], changelogMode=[I])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I])
+ +- Calc(select=[user, product, ts], changelogMode=[I])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain select with all details
+explain changelog_mode, estimated_cost, json_execution_plan select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+ +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+ +- Calc(select=[user, product, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+ +- Calc(select=[user, product, ts])
+ +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: orders[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[user, product, ts])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "WatermarkAssigner[]",
+ "pact" : "Operator",
+ "contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[user, product])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
+!ok
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
new file mode 100644
index 00000000000..dc20a1cb70f
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
@@ -0,0 +1,503 @@
+# view.q - CREATE/DROP/SHOW/DESCRIBE VIEW
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# register a base table first
+CREATE TABLE orders (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ ptime AS PROCTIME(),
+ PRIMARY KEY(`user`) NOT ENFORCED,
+ WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# ==== test temporary view =====
+
+create temporary view v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+create temporary view v1 as select * from orders;
+!output
+org.apache.flink.table.api.ValidationException: Temporary table '`default_catalog`.`default_database`.`v1`' already exists
+!error
+
+# TODO: warning users the view already exists
+create temporary view if not exists v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test query a view with hint
+select * from v1 /*+ OPTIONS('number-of-rows' = '1') */;
+!output
+org.apache.flink.table.api.ValidationException: View '`default_catalog`.`default_database`.`v1`' cannot be enriched with new options. Hints can only be applied to tables.
+!error
+
+# test create a view reference another view
+create temporary view if not exists v2 as select * from v1;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test show create a temporary view
+show create view v1;
+!output
+CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1`(`user`, `product`, `amount`, `ts`, `ptime`) as
+SELECT *
+FROM `default_catalog`.`default_database`.`orders`
+!ok
+
+# test show create a temporary view reference another view
+show create view v2;
+!output
+CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2`(`user`, `product`, `amount`, `ts`, `ptime`) as
+SELECT *
+FROM `default_catalog`.`default_database`.`v1`
+!ok
+
+show tables;
+!output
++------------+
+| table name |
++------------+
+| orders |
+| v1 |
+| v2 |
++------------+
+3 rows in set
+!ok
+
+show views;
+!output
++-----------+
+| view name |
++-----------+
+| v1 |
+| v2 |
++-----------+
+2 rows in set
+!ok
+
+# test SHOW CREATE TABLE for views
+show create table v1;
+!output
+org.apache.flink.table.api.TableException: SHOW CREATE TABLE is only supported for tables, but `default_catalog`.`default_database`.`v1` is a view. Please use SHOW CREATE VIEW instead.
+!error
+
+# ==== test permanent view =====
+
+# register a permanent view with the duplicate name with temporary view
+create view v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test create duplicate view
+create view v1 as select * from orders;
+!output
+org.apache.flink.table.catalog.exceptions.TableAlreadyExistException: Table (or view) default_database.v1 already exists in Catalog default_catalog.
+!error
+
+# test show create a permanent view
+create view permanent_v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test show create a permanent view
+show create view permanent_v1;
+!output
+CREATE VIEW `default_catalog`.`default_database`.`permanent_v1`(`user`, `product`, `amount`, `ts`, `ptime`) as
+SELECT *
+FROM `default_catalog`.`default_database`.`orders`
+!ok
+
+# remove permanent_v1 view
+drop view permanent_v1;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# we didn't distinguish the temporary v1 and permanent v1 for now
+show views;
+!output
++-----------+
+| view name |
++-----------+
+| v1 |
+| v2 |
++-----------+
+2 rows in set
+!ok
+
+# test describe view
+describe v1;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+# test SHOW COLUMNS
+show columns from v1;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns in v1;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns from v1 like '%u';
+!output
+Empty set
+!ok
+
+show columns in v1 like '%u';
+!output
+Empty set
+!ok
+
+show columns from v1 not like '%u';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns in v1 not like '%u';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns from v1 like '%r';
+!output
++------+--------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns in v1 like '%r';
+!output
++------+--------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns from v1 not like '%r';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+show columns in v1 not like '%r';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+show columns from v1 like '%u%';
+!output
++---------+-------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
++---------+-------------+-------+-----+--------+-----------+
+3 rows in set
+!ok
+
+show columns in v1 like '%u%';
+!output
++---------+-------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
++---------+-------------+-------+-----+--------+-----------+
+3 rows in set
+!ok
+
+show columns from v1 not like '%u%';
+!output
++-------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++-------+-----------------------------+-------+-----+--------+-----------+
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++-------+-----------------------------+-------+-----+--------+-----------+
+2 rows in set
+!ok
+
+show columns in v1 not like '%u%';
+!output
++-------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++-------+-----------------------------+-------+-----+--------+-----------+
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++-------+-----------------------------+-------+-----+--------+-----------+
+2 rows in set
+!ok
+
+show columns from v1 like 'use_';
+!output
++------+--------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns in v1 like 'use_';
+!output
++------+--------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns from v1 not like 'use_';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+show columns in v1 not like 'use_';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+# we can't drop permanent view if there is temporary view with the same name
+drop view v1;
+!output
+org.apache.flink.table.api.ValidationException: Temporary view with identifier '`default_catalog`.`default_database`.`v1`' exists. Drop it first before removing the permanent view.
+!error
+
+# although temporary v2 needs temporary v1, dropping v1 first does not throw exception
+drop temporary view v1;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# now we can drop permanent view v1
+drop view v1;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+# test drop invalid table
+drop view non_exist;
+!output
+org.apache.flink.table.api.ValidationException: View with identifier 'default_catalog.default_database.non_exist' does not exist.
+!error
+
+# ===== test playing with keyword identifiers =====
+
+create view `mod` as select * from orders;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+describe `mod`;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+desc `mod`;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE | | | |
+| product | VARCHAR(32) | TRUE | | | |
+| amount | INT | TRUE | | | |
+| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+drop view `mod`;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
++------------+
+| table name |
++------------+
+| orders |
+| v2 |
++------------+
+2 rows in set
+!ok
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
index 63af5b3f3a3..90b7517f91c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -279,6 +280,12 @@ public final class MiniClusterExtension
MiniClusterClient.MiniClusterId.INSTANCE);
}
+ // Utils
+
+ public Configuration getClientConfiguration() {
+ return internalMiniClusterExtension.getClientConfiguration();
+ }
+
private static class CloseableParameter<T extends AutoCloseable>
implements ExtensionContext.Store.CloseableResource {
private final T autoCloseable;