You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/13 11:45:20 UTC

[flink] branch master updated: [FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService (#19846)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e494c91d1e [FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService (#19846)
7e494c91d1e is described below

commit 7e494c91d1e340a1f9438785e2a56242fb5894c8
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Wed Jul 13 19:45:11 2022 +0800

    [FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService (#19846)
---
 .../src/test/resources/sql_multi/statement_set.q   |  16 -
 .../flink/table/gateway/api/SqlGatewayService.java |  18 +
 .../flink/table/gateway/api/results/ResultSet.java |  19 +-
 .../gateway/api/utils/MockedSqlGatewayService.java |  11 +
 flink-table/flink-sql-gateway/pom.xml              |  19 +
 .../gateway/service/SqlGatewayServiceImpl.java     |  29 +
 .../gateway/service/context/SessionContext.java    |  77 +-
 .../service/operation/OperationExecutor.java       | 178 ++++
 .../service/operation/OperationManager.java        |  50 +-
 .../gateway/service/result/ResultFetcher.java      |  17 +
 .../table/gateway/service/result/ResultStore.java  |   7 +
 .../table/gateway/service/session/Session.java     |   6 +
 .../table/gateway/service/utils/Constants.java     |  27 +
 .../gateway/service/SqlGatewayServiceITCase.java   |  31 +
 .../service/SqlGatewayServiceStatementITCase.java  | 309 +++++++
 .../service/context/SessionContextTest.java        | 148 +++
 .../gateway/service/result/ResultFetcherTest.java  |  56 +-
 .../service/utils/SqlGatewayServiceExtension.java  |  24 +
 .../gateway/service/utils/SqlScriptReader.java     | 124 +++
 .../gateway/service/utils/TestSqlStatement.java    |  42 +
 .../src/test/resources/sql/insert.q                | 182 ++++
 .../src/test/resources/sql}/statement_set.q        | 213 +++--
 .../src/test/resources/sql/table.q                 | 988 +++++++++++++++++++++
 .../src/test/resources/sql/view.q                  | 503 +++++++++++
 .../flink/test/junit5/MiniClusterExtension.java    |   7 +
 25 files changed, 2951 insertions(+), 150 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
index 3f5d19f043b..bb83b56ed20 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
@@ -1,19 +1,3 @@
-# table.q - EXECUTE/EXPLAIN STATEMENT SET BEGIN ... END
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
 # statement-set.q - BEGIN STATEMENT SET, END
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index c0bdf01c36e..7e92610fb6e 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.gateway.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
@@ -111,6 +112,23 @@ public interface SqlGatewayService {
     // Statements
     // -------------------------------------------------------------------------------------------
 
+    /**
+     * Execute the submitted statement.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param statement the SQL to execute.
+     * @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the
+     *     timeout mechanism.
+     * @param executionConfig execution config for the statement.
+     * @return handle to identify the operation.
+     */
+    OperationHandle executeStatement(
+            SessionHandle sessionHandle,
+            String statement,
+            long executionTimeoutMs,
+            Configuration executionConfig)
+            throws SqlGatewayException;
+
     /**
      * Fetch the results from the operation. When maxRows is Integer.MAX_VALUE, it means to fetch
      * all available data.
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
index 3750a1cee95..c11581eeffc 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.gateway.api.results;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 
@@ -72,7 +73,23 @@ public class ResultSet {
         return nextToken;
     }
 
-    /** The schema of the data. */
+    /**
+     * The schema of the data.
+     *
+     * <p>The schema of the DDL, USE, EXPLAIN, SHOW and DESCRIBE align with the schema of the {@link
+     * TableResult#getResolvedSchema()}. The only differences is the schema of the `INSERT`
+     * statement.
+     *
+     * <p>The schema of INSERT:
+     *
+     * <pre>
+     * +-------------+-------------+----------+
+     * | column name | column type | comments |
+     * +-------------+-------------+----------+
+     * |   job id    |    string   |          |
+     * +- -----------+-------------+----------+
+     * </pre>
+     */
     public ResolvedSchema getResultSchema() {
         return resultSchema;
     }
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 656f353ae44..f962e9c727e 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.gateway.api.utils;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -78,4 +79,14 @@ public class MockedSqlGatewayService implements SqlGatewayService {
             SessionHandle sessionHandle, OperationHandle operationHandle) {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public OperationHandle executeStatement(
+            SessionHandle sessionHandle,
+            String statement,
+            long executionTimeoutMs,
+            Configuration executionConfig)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
index 9c217e71e17..7b46860bed1 100644
--- a/flink-table/flink-sql-gateway/pom.xml
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -72,6 +72,25 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 39e253c020b..fb68828aedf 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.gateway.service;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
@@ -123,6 +124,34 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         }
     }
 
+    @Override
+    public OperationHandle executeStatement(
+            SessionHandle sessionHandle,
+            String statement,
+            long executionTimeoutMs,
+            Configuration executionConfig)
+            throws SqlGatewayException {
+        try {
+            if (executionTimeoutMs > 0) {
+                // TODO: support the feature in FLINK-27838
+                throw new UnsupportedOperationException(
+                        "SqlGatewayService doesn't support timeout mechanism now.");
+            }
+
+            return getSession(sessionHandle)
+                    .getOperationManager()
+                    .submitOperation(
+                            OperationType.EXECUTE_STATEMENT,
+                            handle ->
+                                    getSession(sessionHandle)
+                                            .createExecutor(executionConfig)
+                                            .executeStatement(handle, statement));
+        } catch (Throwable t) {
+            LOG.error("Failed to execute statement.", t);
+            throw new SqlGatewayException("Failed to execute statement.", t);
+        }
+    }
+
     @Override
     public ResultSet fetchResults(
             SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index b2e3921ba07..1361e7b15ab 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.gateway.service.context;
 
 import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -27,6 +29,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -37,7 +40,9 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
 
 import org.slf4j.Logger;
@@ -63,6 +68,7 @@ public class SessionContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class);
 
+    private final DefaultContext defaultContext;
     private final SessionHandle sessionId;
     private final EndpointVersion endpointVersion;
 
@@ -74,12 +80,14 @@ public class SessionContext {
     private final OperationManager operationManager;
 
     private SessionContext(
+            DefaultContext defaultContext,
             SessionHandle sessionId,
             EndpointVersion endpointVersion,
             Configuration sessionConf,
             URLClassLoader classLoader,
             SessionState sessionState,
             OperationManager operationManager) {
+        this.defaultContext = defaultContext;
         this.sessionId = sessionId;
         this.endpointVersion = endpointVersion;
         this.sessionConf = sessionConf;
@@ -88,8 +96,22 @@ public class SessionContext {
         this.operationManager = operationManager;
     }
 
+    /** Close resources, e.g. catalogs. */
+    public void close() {
+        for (String name : sessionState.catalogManager.listCatalogs()) {
+            sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);
+        }
+        try {
+            userClassloader.close();
+        } catch (IOException e) {
+            LOG.debug("Error while closing class loader.", e);
+        }
+
+        operationManager.close();
+    }
+
     // --------------------------------------------------------------------------------------------
-    // Getter method
+    // Getter/Setter
     // --------------------------------------------------------------------------------------------
 
     public SessionHandle getSessionId() {
@@ -100,22 +122,43 @@ public class SessionContext {
         return sessionConf.toMap();
     }
 
-    // --------------------------------------------------------------------------------------------
-    // Method to execute commands
-    // --------------------------------------------------------------------------------------------
+    public void set(String key, String value) {
+        try {
+            // Test whether the key value will influence the creation of the Executor.
+            createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)));
+        } catch (Exception e) {
+            // get error and reset the key with old value
+            throw new SqlExecutionException(
+                    String.format("Failed to set key %s with value %s.", key, value), e);
+        }
+        sessionConf.setString(key, value);
+    }
 
-    /** Close resources, e.g. catalogs. */
-    public void close() {
-        for (String name : sessionState.catalogManager.listCatalogs()) {
-            sessionState.catalogManager.unregisterCatalog(name, true);
+    public synchronized void reset(String key) {
+        Configuration configuration = defaultContext.getFlinkConfig();
+        // If the key exist in default yaml, reset to default
+        ConfigOption<String> option = ConfigOptions.key(key).stringType().noDefaultValue();
+        if (configuration.contains(option)) {
+            String defaultValue = configuration.get(option);
+            set(key, defaultValue);
+        } else {
+            sessionConf.removeConfig(option);
         }
-        try {
-            userClassloader.close();
-        } catch (IOException e) {
-            LOG.debug("Error while closing class loader.", e);
+    }
+
+    public synchronized void reset() {
+        for (String key : sessionConf.keySet()) {
+            sessionConf.removeConfig(ConfigOptions.key(key).stringType().noDefaultValue());
         }
+        sessionConf.addAll(defaultContext.getFlinkConfig());
+    }
 
-        operationManager.close();
+    // --------------------------------------------------------------------------------------------
+    // Method to execute commands
+    // --------------------------------------------------------------------------------------------
+
+    public OperationExecutor createOperationExecutor(Configuration executionConfig) {
+        return new OperationExecutor(this, executionConfig);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -168,6 +211,7 @@ public class SessionContext {
                 new SessionState(catalogManager, moduleManager, functionCatalog);
 
         return new SessionContext(
+                defaultContext,
                 sessionId,
                 endpointVersion,
                 configuration,
@@ -201,11 +245,12 @@ public class SessionContext {
         final EnvironmentSettings settings =
                 EnvironmentSettings.newInstance().withConfiguration(sessionConf).build();
 
-        TableConfig tableConfig = new TableConfig();
-        tableConfig.addConfiguration(sessionConf);
-
         StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();
 
+        TableConfig tableConfig = TableConfig.getDefault();
+        tableConfig.setRootConfiguration(defaultContext.getFlinkConfig());
+        tableConfig.addConfiguration(sessionConf);
+
         final Executor executor = lookupExecutor(streamExecEnv);
         return createStreamTableEnvironment(
                 streamExecEnv,
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
new file mode 100644
index 00000000000..bf9bc7cf1e3
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
+import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
+import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), collect(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            // set a property
+            sessionContext.set(setOp.getKey().get().trim(), setOp.getValue().get().trim());
+            return new ResultFetcher(
+                    handle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+            // show all properties
+            Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
+            return new ResultFetcher(
+                    handle,
+                    ResolvedSchema.of(
+                            Column.physical(SET_KEY, DataTypes.STRING()),
+                            Column.physical(SET_VALUE, DataTypes.STRING())),
+                    collect(
+                            configMap.entrySet().stream()
+                                    .map(
+                                            entry ->
+                                                    GenericRowData.of(
+                                                            StringData.fromString(entry.getKey()),
+                                                            StringData.fromString(
+                                                                    entry.getValue())))
+                                    .map(RowData.class::cast)
+                                    .iterator()));
+        } else {
+            // impossible
+            throw new SqlExecutionException("Illegal SetOperation: " + setOp.asSummaryString());
+        }
+    }
+
+    private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation resetOp) {
+        if (resetOp.getKey().isPresent()) {
+            // reset a property
+            sessionContext.reset(resetOp.getKey().get().trim());
+        } else {
+            // reset all properties
+            sessionContext.reset();
+        }
+        return new ResultFetcher(
+                handle,
+                TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+    }
+
+    private ResultFetcher callModifyOperations(
+            TableEnvironmentInternal tableEnv,
+            OperationHandle handle,
+            List<ModifyOperation> modifyOperations) {
+        TableResultInternal result = tableEnv.executeInternal(modifyOperations);
+        return new ResultFetcher(
+                handle,
+                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
+                Collections.singletonList(
+                        GenericRowData.of(
+                                StringData.fromString(
+                                        result.getJobClient()
+                                                .orElseThrow(
+                                                        () ->
+                                                                new SqlExecutionException(
+                                                                        String.format(
+                                                                                "Can't get job client for the operation %s.",
+                                                                                handle)))
+                                                .getJobID()
+                                                .toString()))));
+    }
+
+    private List<RowData> collect(Iterator<RowData> tableResult) {
+        List<RowData> rows = new ArrayList<>();
+        tableResult.forEachRemaining(rows::add);
+        return rows;
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 878674a0e72..f0aeb8c2af5 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -18,29 +18,29 @@
 
 package org.apache.flink.table.gateway.service.operation;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.operation.OperationType;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
-import org.apache.flink.util.CloseableIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /** Manager for the {@link Operation}. */
+@Internal
 public class OperationManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class);
@@ -58,9 +58,9 @@ public class OperationManager {
     }
 
     /**
-     * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manges the
-     * lifecycle of the {@link Operation}, including register resources, fire the execution and so
-     * on.
+     * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manages
+     * the lifecycle of the {@link Operation}, including register resources, fire the execution and
+     * so on.
      *
      * @param operationType The type of the submitted operation.
      * @param executor Worker to execute.
@@ -75,19 +75,29 @@ public class OperationManager {
                         operationType,
                         () -> {
                             ResultSet resultSet = executor.call();
-                            List<RowData> rows = resultSet.getData();
                             return new ResultFetcher(
-                                    handle,
-                                    resultSet.getResultSchema(),
-                                    CloseableIterator.adapterForIterator(rows.iterator()),
-                                    rows.size());
+                                    handle, resultSet.getResultSchema(), resultSet.getData());
                         });
 
-        writeLock(
-                () -> {
-                    submittedOperations.put(handle, operation);
-                    operation.run(service);
-                });
+        submitOperationInternal(handle, operation);
+        return handle;
+    }
+
+    /**
+     * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manges the
+     * lifecycle of the {@link Operation}, including register resources, fire the execution and so
+     * on.
+     *
+     * @param operationType The type of the submitted operation.
+     * @param fetcherSupplier offer the fetcher to get the results.
+     * @return OperationHandle to fetch the results or check the status.
+     */
+    public OperationHandle submitOperation(
+            OperationType operationType, Function<OperationHandle, ResultFetcher> fetcherSupplier) {
+        OperationHandle handle = OperationHandle.create();
+        Operation operation =
+                new Operation(handle, operationType, () -> fetcherSupplier.apply(handle));
+        submitOperationInternal(handle, operation);
         return handle;
     }
 
@@ -173,6 +183,14 @@ public class OperationManager {
                 });
     }
 
+    private void submitOperationInternal(OperationHandle handle, Operation operation) {
+        writeLock(
+                () -> {
+                    submittedOperations.put(handle, operation);
+                    operation.run(service);
+                });
+    }
+
     private void writeLock(Runnable runner) {
         lock.writeLock().lock();
         try {
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index 145639552dd..81bb11f55a2 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -47,6 +47,7 @@ import java.util.Optional;
 public class ResultFetcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class);
+    private static final int TABLE_RESULT_MAX_INITIAL_CAPACITY = 5000;
 
     private final OperationHandle operationHandle;
 
@@ -59,6 +60,14 @@ public class ResultFetcher {
     private boolean noMoreResults = false;
 
     public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows) {
+        this(operationHandle, resultSchema, resultRows, TABLE_RESULT_MAX_INITIAL_CAPACITY);
+    }
+
+    @VisibleForTesting
+    ResultFetcher(
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
             CloseableIterator<RowData> resultRows,
@@ -68,6 +77,14 @@ public class ResultFetcher {
         this.resultStore = new ResultStore(resultRows, maxBufferSize);
     }
 
+    public ResultFetcher(
+            OperationHandle operationHandle, ResolvedSchema resultSchema, List<RowData> rows) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.bufferedResults.addAll(rows);
+        this.resultStore = ResultStore.DUMMY_RESULT_STORE;
+    }
+
     public void close() {
         resultStore.close();
     }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
index 180da8f9e59..edc180536ff 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
@@ -37,6 +37,13 @@ public class ResultStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResultStore.class);
 
+    public static final ResultStore DUMMY_RESULT_STORE =
+            new ResultStore(CloseableIterator.adapterForIterator(Collections.emptyIterator()), 0);
+
+    static {
+        DUMMY_RESULT_STORE.close();
+    }
+
     private final CloseableIterator<RowData> result;
     private final List<RowData> recordsBuffer = new ArrayList<>();
     private final int maxBufferSize;
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
index 2369aea247e..f2f4ff0f38f 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.table.gateway.service.session;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
 
 import java.io.Closeable;
@@ -62,6 +64,10 @@ public class Session implements Closeable {
         return sessionContext.getOperationManager();
     }
 
+    public OperationExecutor createExecutor(Configuration executionConfig) {
+        return sessionContext.createOperationExecutor(executionConfig);
+    }
+
     @Override
     public void close() {
         sessionContext.close();
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
new file mode 100644
index 00000000000..7991c6797af
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/** Constants used in the SqlGatewayService. */
+public class Constants {
+
+    public static final String JOB_ID = "job id";
+    public static final String SET_KEY = "key";
+    public static final String SET_VALUE = "value";
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index d39c797eb63..c05e6b10167 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.gateway.service;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
@@ -54,6 +55,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -68,6 +70,7 @@ import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -247,6 +250,34 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         assertEquals(0, sessionManager.getOperationCount(sessionHandle));
     }
 
+    @Test
+    public void testExecuteSqlWithConfig() {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        String key = "username";
+        String value = "Flink";
+        OperationHandle operationHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        "SET",
+                        -1,
+                        Configuration.fromMap(Collections.singletonMap(key, value)));
+
+        Long token = 0L;
+        List<RowData> settings = new ArrayList<>();
+        while (token != null) {
+            ResultSet result =
+                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
+            settings.addAll(result.getData());
+            token = result.getNextToken();
+        }
+
+        assertThat(
+                settings,
+                hasItem(
+                        GenericRowData.of(
+                                StringData.fromString(key), StringData.fromString(value))));
+    }
+
     // --------------------------------------------------------------------------------------------
     // Concurrent tests
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
new file mode 100644
index 00000000000..40a3ec44f34
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.gateway.service.utils.SqlScriptReader;
+import org.apache.flink.table.gateway.service.utils.TestSqlStatement;
+import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
+
+import org.apache.calcite.util.Util;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.gateway.service.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Test {@link SqlGatewayService}#executeStatement. */
+@Execution(CONCURRENT)
+public class SqlGatewayServiceStatementITCase {
+
+    @RegisterExtension
+    @Order(1)
+    public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+    private static SqlGatewayService service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+    private final Map<String, String> replaceVars = new HashMap<>();
+
+    @BeforeAll
+    public static void setUp() {
+        service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
+    }
+
+    @BeforeEach
+    public void before(@TempDir Path temporaryFolder) throws IOException {
+        // initialize new folders for every test, so the vars can be reused by every SQL scripts
+        replaceVars.put(
+                "$VAR_STREAMING_PATH",
+                Files.createDirectory(temporaryFolder.resolve("streaming")).toFile().getPath());
+        replaceVars.put(
+                "$VAR_BATCH_PATH",
+                Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
+    }
+
+    public static Stream<String> parameters() throws Exception {
+        String first = "sql/table.q";
+        URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + first);
+        File firstFile = Paths.get(checkNotNull(url.toURI())).toFile();
+        final int commonPrefixLength = firstFile.getAbsolutePath().length() - first.length();
+        File dir = firstFile.getParentFile();
+        final List<String> paths = new ArrayList<>();
+        final FilenameFilter filter = new PatternFilenameFilter(".*\\.q$");
+        for (File f : Util.first(dir.listFiles(filter), new File[0])) {
+            paths.add(f.getAbsolutePath().substring(commonPrefixLength));
+        }
+        return paths.stream();
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void testSqlStatements(String sqlPath) throws Exception {
+        String in = getInputFromPath(sqlPath);
+        List<TestSqlStatement> testSqlStatements = SqlScriptReader.parseSqlScript(in);
+
+        assertThat(String.join("", runStatements(testSqlStatements))).isEqualTo(in);
+    }
+
+    /**
+     * Returns printed results for each ran SQL statements.
+     *
+     * @param statements the SQL statements to run
+     * @return the stringified results
+     */
+    private List<String> runStatements(List<TestSqlStatement> statements) {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        List<String> output = new ArrayList<>();
+        for (TestSqlStatement statement : statements) {
+            StringBuilder builder = new StringBuilder();
+            builder.append(statement.getComment());
+            builder.append(statement.getSql());
+
+            try {
+                builder.append(runSingleStatement(sessionHandle, statement.getSql()));
+            } catch (Throwable t) {
+                Throwable root = getRootCause(t);
+                builder.append(
+                        Tag.ERROR.addTag(
+                                root.getClass().getName()
+                                        + ": "
+                                        + root.getMessage().trim()
+                                        + "\n"));
+            }
+            output.add(builder.toString());
+        }
+
+        return output;
+    }
+
+    // -------------------------------------------------------------------------------------------
+    // Utility
+    // -------------------------------------------------------------------------------------------
+
+    /** Mark the output type. */
+    enum Tag {
+        INFO("!info"),
+
+        OK("!ok"),
+
+        ERROR("!error");
+
+        private final String tag;
+
+        Tag(String tag) {
+            this.tag = tag;
+        }
+
+        public String addTag(String content) {
+            return HINT_START_OF_OUTPUT + "\n" + content + tag + "\n";
+        }
+    }
+
+    private String getInputFromPath(String sqlPath) throws IOException {
+        URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + sqlPath);
+
+        // replace the placeholder with specified value if exists
+        String[] keys = replaceVars.keySet().toArray(new String[0]);
+        String[] values = Arrays.stream(keys).map(replaceVars::get).toArray(String[]::new);
+
+        return StringUtils.replaceEach(
+                IOUtils.toString(checkNotNull(url), StandardCharsets.UTF_8), keys, values);
+    }
+
+    protected static Throwable getRootCause(Throwable e) {
+        Throwable root = e;
+        while (root.getCause() != null) {
+            root = root.getCause();
+        }
+        return root;
+    }
+
+    /**
+     * Returns printed results for each ran SQL statements.
+     *
+     * @param sessionHandle the Session that run the statement
+     * @param statement the SQL statement to run
+     * @return the printed results in tableau style
+     */
+    private String runSingleStatement(SessionHandle sessionHandle, String statement)
+            throws Exception {
+        OperationHandle operationHandle =
+                service.executeStatement(sessionHandle, statement, -1, new Configuration());
+        CommonTestUtils.waitUtil(
+                () ->
+                        service.getOperationInfo(sessionHandle, operationHandle)
+                                .getStatus()
+                                .isTerminalStatus(),
+                Duration.ofSeconds(100),
+                "Failed to wait operation finish.");
+
+        if (!service.getOperationInfo(sessionHandle, operationHandle).isHasResults()) {
+            return Tag.INFO.addTag("");
+        }
+
+        // The content in the result of the `explain` and `show create` statement is large, so it's
+        // more straightforward to just print the content without the table.
+        if (statement.toUpperCase().startsWith("EXPLAIN")
+                || statement.toUpperCase().startsWith("SHOW CREATE")) {
+            ResultSet resultSet =
+                    service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE);
+            return Tag.OK.addTag(
+                    replaceStreamNodeId(
+                                    replaceNodeIdInOperator(
+                                            resultSet.getData().get(0).getString(0).toString()))
+                            + "\n");
+        } else {
+            ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+            ResultSet resultSet =
+                    service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE);
+
+            boolean isStreaming =
+                    Configuration.fromMap(service.getSessionConfig(sessionHandle))
+                            .get(RUNTIME_MODE)
+                            .equals(STREAMING);
+            boolean isQuery = statement.toUpperCase().startsWith("SELECT");
+
+            PrintStyle style =
+                    PrintStyle.tableauWithDataInferredColumnWidths(
+                            resultSet.getResultSchema(),
+                            new RowDataToStringConverterImpl(
+                                    resultSet.getResultSchema().toPhysicalRowDataType(),
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    SqlGatewayServiceStatementITCase.class.getClassLoader(),
+                                    false),
+                            Integer.MAX_VALUE,
+                            true,
+                            isStreaming && isQuery);
+
+            PrintWriter writer = new PrintWriter(outContent);
+            style.print(new RowDataIterator(sessionHandle, operationHandle), writer);
+            return Tag.OK.addTag(outContent.toString());
+        }
+    }
+
+    private static class RowDataIterator implements Iterator<RowData> {
+
+        private final SessionHandle sessionHandle;
+        private final OperationHandle operationHandle;
+
+        private Long token = 0L;
+        private Iterator<RowData> fetchedRows = Collections.emptyIterator();
+
+        public RowDataIterator(SessionHandle sessionHandle, OperationHandle operationHandle) {
+            this.sessionHandle = sessionHandle;
+            this.operationHandle = operationHandle;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (token != null && !fetchedRows.hasNext()) {
+                ResultSet resultSet =
+                        service.fetchResults(
+                                sessionHandle, operationHandle, token, Integer.MAX_VALUE);
+                token = resultSet.getNextToken();
+                fetchedRows = resultSet.getData().iterator();
+            }
+
+            return token != null;
+        }
+
+        @Override
+        public RowData next() {
+            return fetchedRows.next();
+        }
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
new file mode 100644
index 00000000000..6d78e3474d4
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.ThreadUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link SessionContext}. */
+class SessionContextTest {
+
+    private static final ExecutorService EXECUTOR_SERVICE =
+            ThreadUtils.newThreadPool(5, 500, 60_0000, "session-context-test");
+    private SessionContext sessionContext;
+
+    @BeforeEach
+    public void setup() {
+        sessionContext = createSessionContext();
+    }
+
+    @AfterAll
+    public static void cleanUp() {
+        EXECUTOR_SERVICE.shutdown();
+    }
+
+    @Test
+    public void testSetAndResetOption() {
+        // table config option
+        sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
+        // runtime config option
+        sessionContext.set(MAX_PARALLELISM.key(), "128");
+        // runtime config option and doesn't have default value
+        sessionContext.set(NAME.key(), "test");
+        // runtime config from flink-conf
+        sessionContext.set(OBJECT_REUSE.key(), "false");
+        assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
+        assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(128);
+        assertThat(getConfiguration().get(NAME)).isEqualTo("test");
+        assertThat(getConfiguration().get(OBJECT_REUSE)).isFalse();
+
+        sessionContext.reset();
+        assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("default");
+        assertThat(getConfiguration().get(NAME)).isNull();
+        // The value of MAX_PARALLELISM in DEFAULTS_ENVIRONMENT_FILE is 16
+        assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(16);
+        assertThat(getConfiguration().getOptional(NAME)).isEmpty();
+        // The value of OBJECT_REUSE in origin configuration is true
+        assertThat(getConfiguration().get(OBJECT_REUSE)).isTrue();
+    }
+
+    @Test
+    public void testSetAndResetKeyInConfigOptions() {
+        // table config option
+        sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
+        // runtime config option
+        sessionContext.set(MAX_PARALLELISM.key(), "128");
+        // runtime config option and doesn't have default value
+        sessionContext.set(NAME.key(), "test");
+        // runtime config from flink-conf
+        sessionContext.set(OBJECT_REUSE.key(), "false");
+
+        assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
+        assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(128);
+        assertThat(getConfiguration().get(NAME)).isEqualTo("test");
+        assertThat(getConfiguration().get(OBJECT_REUSE)).isFalse();
+
+        sessionContext.reset(TABLE_SQL_DIALECT.key());
+        assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("default");
+
+        sessionContext.reset(MAX_PARALLELISM.key());
+        assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(16);
+
+        sessionContext.reset(NAME.key());
+        assertThat(getConfiguration().get(NAME)).isNull();
+
+        sessionContext.reset(OBJECT_REUSE.key());
+        assertThat(getConfiguration().get(OBJECT_REUSE)).isTrue();
+    }
+
+    @Test
+    public void testSetAndResetArbitraryKey() {
+        // other property not in flink-conf
+        sessionContext.set("aa", "11");
+        sessionContext.set("bb", "22");
+
+        assertThat(sessionContext.getConfigMap().get("aa")).isEqualTo("11");
+        assertThat(sessionContext.getConfigMap().get("bb")).isEqualTo("22");
+
+        sessionContext.reset("aa");
+        assertThat(sessionContext.getConfigMap().get("aa")).isNull();
+        assertThat(sessionContext.getConfigMap().get("bb")).isEqualTo("22");
+
+        sessionContext.reset("bb");
+        assertThat(sessionContext.getConfigMap().get("bb")).isNull();
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private SessionContext createSessionContext() {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.set(OBJECT_REUSE, true);
+        flinkConfig.set(MAX_PARALLELISM, 16);
+        DefaultContext defaultContext =
+                new DefaultContext(flinkConfig, Collections.singletonList(new DefaultCLI()));
+        return SessionContext.create(
+                defaultContext,
+                SessionHandle.create(),
+                MockedEndpointVersion.V1,
+                flinkConfig,
+                EXECUTOR_SERVICE);
+    }
+
+    private ReadableConfig getConfiguration() {
+        return Configuration.fromMap(sessionContext.getConfigMap());
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index dd079ced141..59a128c5cca 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -172,35 +172,18 @@ public class ResultFetcherTest extends TestLogger {
 
     @Test
     public void testFetchResultInParallel() throws Exception {
-        int bufferSize = data.size() / 2;
         ResultFetcher fetcher =
-                buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize);
-
-        AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
-        int fetchThreadNum = 100;
-        CountDownLatch latch = new CountDownLatch(fetchThreadNum);
-
+                buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
         CommonTestUtils.waitUtil(
                 () -> fetcher.getResultStore().getBufferedRecordSize() > 0,
                 Duration.ofSeconds(10),
                 "Failed to wait the buffer has data.");
-        List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
-        for (int i = 0; i < fetchThreadNum; i++) {
-            threadFactory
-                    .newThread(
-                            () -> {
-                                ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
-
-                                if (!firstFetch.equals(resultSet.getData())) {
-                                    isEqual.set(false);
-                                }
-                                latch.countDown();
-                            })
-                    .start();
-        }
+        checkFetchResultInParallel(fetcher);
+    }
 
-        latch.await();
-        assertEquals(true, isEqual.get());
+    @Test
+    public void testFetchResultFromDummyStoreInParallel() throws Exception {
+        checkFetchResultInParallel(new ResultFetcher(OperationHandle.create(), schema, data));
     }
 
     @Test
@@ -221,7 +204,8 @@ public class ResultFetcherTest extends TestLogger {
 
         long testToken = token;
         AtomicReference<Boolean> meetEnd = new AtomicReference<>(false);
-        new Thread(
+        threadFactory
+                .newThread(
                         () -> {
                             // Should meet EOS in the end.
                             long nextToken = testToken;
@@ -349,6 +333,30 @@ public class ResultFetcherTest extends TestLogger {
         assertEquals(data, fetchedRows);
     }
 
+    private void checkFetchResultInParallel(ResultFetcher fetcher) throws Exception {
+        AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
+        int fetchThreadNum = 100;
+        CountDownLatch latch = new CountDownLatch(fetchThreadNum);
+
+        List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
+        for (int i = 0; i < fetchThreadNum; i++) {
+            threadFactory
+                    .newThread(
+                            () -> {
+                                ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
+
+                                if (!firstFetch.equals(resultSet.getData())) {
+                                    isEqual.set(false);
+                                }
+                                latch.countDown();
+                            })
+                    .start();
+        }
+
+        latch.await();
+        assertEquals(true, isEqual.get());
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private static class ErrorIterator implements Iterator<RowData> {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
index 0e5fbcec5ba..db214e0ca6b 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.gateway.service.utils;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 
+import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -32,8 +34,10 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
 
@@ -43,6 +47,15 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa
     private SqlGatewayService service;
     private SessionManager sessionManager;
     private TemporaryFolder temporaryFolder;
+    private final Supplier<Configuration> configSupplier;
+
+    public SqlGatewayServiceExtension() {
+        this(Configuration::new);
+    }
+
+    public SqlGatewayServiceExtension(Supplier<Configuration> configSupplier) {
+        this.configSupplier = configSupplier;
+    }
 
     @Override
     public void beforeAll(ExtensionContext context) throws Exception {
@@ -57,6 +70,11 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa
                 throw new IOException("Can't create testing flink-conf.yaml file.");
             }
 
+            FileUtils.write(
+                    confYaml,
+                    getFlinkConfContent(configSupplier.get().toMap()),
+                    StandardCharsets.UTF_8);
+
             // adjust the test environment for the purposes of this test
             Map<String, String> map = new HashMap<>(System.getenv());
             map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
@@ -86,4 +104,10 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa
     public SessionManager getSessionManager() {
         return sessionManager;
     }
+
+    private String getFlinkConfContent(Map<String, String> flinkConf) {
+        StringBuilder sb = new StringBuilder();
+        flinkConf.forEach((k, v) -> sb.append(k).append(": ").append(v).append("\n"));
+        return sb.toString();
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlScriptReader.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlScriptReader.java
new file mode 100644
index 00000000000..81a3adb55f3
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlScriptReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A utility to read and parse content of a SQL script. The SQL script is located in "resources/sql"
+ * path and in the "xx.q" file name pattern. The SQL script is executed and tested by {@link
+ * SqlGatewayServiceStatementITCase}.
+ */
+public final class SqlScriptReader implements AutoCloseable {
+    public static final String HINT_START_OF_OUTPUT = "!output";
+    private final BufferedReader reader;
+    private String currentLine;
+
+    public static List<TestSqlStatement> parseSqlScript(String in) {
+        try (SqlScriptReader sqlReader = new SqlScriptReader(in)) {
+            return sqlReader.parseSqlScript();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SqlScriptReader(String input) {
+        this.reader = new BufferedReader(new StringReader(input));
+    }
+
+    private List<TestSqlStatement> parseSqlScript() throws IOException {
+        List<TestSqlStatement> specs = new ArrayList<>();
+        TestSqlStatement spec;
+        while ((spec = readNext()) != null) {
+            specs.add(spec);
+        }
+        return specs;
+    }
+
+    private void readLine() throws IOException {
+        this.currentLine = reader.readLine();
+    }
+
+    private @Nullable TestSqlStatement readNext() throws IOException {
+        StringBuilder commentLines = new StringBuilder();
+        StringBuilder sqlLines = new StringBuilder();
+        ReadingStatus status = ReadingStatus.BEGINNING;
+        readLine();
+        while (currentLine != null) {
+            switch (status) {
+                case BEGINNING:
+                    if (currentLine.startsWith("#") || currentLine.trim().length() == 0) {
+                        commentLines.append(currentLine).append("\n");
+                        // continue reading if not reach SQL statement
+                        readLine();
+                    } else {
+                        // if current currentLine is not comment and empty currentLine, begin to
+                        // read SQL
+                        status = ReadingStatus.SQL_STATEMENT;
+                    }
+                    break;
+
+                case SQL_STATEMENT:
+                    if (currentLine.trim().equals(HINT_START_OF_OUTPUT)) {
+                        // SQL statement is finished, begin to read result content
+                        status = ReadingStatus.RESULT_CONTENT;
+                    } else {
+                        sqlLines.append(currentLine).append("\n");
+                    }
+                    // continue reading
+                    readLine();
+                    break;
+
+                case RESULT_CONTENT:
+                    if (!currentLine.startsWith("!")) {
+                        // continuously consume if not reaching result flag
+                        readLine();
+                    } else {
+                        // reach result flag and return
+                        status = ReadingStatus.FINISH;
+                    }
+                    break;
+
+                case FINISH:
+                    return new TestSqlStatement(commentLines.toString(), sqlLines.toString());
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        reader.close();
+    }
+
+    private enum ReadingStatus {
+        BEGINNING,
+        SQL_STATEMENT,
+        RESULT_CONTENT,
+        FINISH
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/TestSqlStatement.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/TestSqlStatement.java
new file mode 100644
index 00000000000..9f3535b80a5
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/TestSqlStatement.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/**
+ * A structure describes a SQL statement for testing.
+ *
+ * @see SqlScriptReader
+ */
+public class TestSqlStatement {
+    private final String comment;
+    private final String sql;
+
+    public TestSqlStatement(String comment, String sql) {
+        this.comment = comment;
+        this.sql = sql;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
new file mode 100644
index 00000000000..2348b1f57a0
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -0,0 +1,182 @@
+# insert.q - insert
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+SET 'table.dml-sync' = 'true';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test streaming insert
+# ==========================================================================
+
+SET 'execution.runtime-mode' = 'streaming';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create table StreamingTable (
+  id int,
+  str string
+) with (
+  'connector' = 'filesystem',
+  'path' = '$VAR_STREAMING_PATH',
+  'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = 'e68e7fabddfade4f42910980652582dc';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+!output
++----------------------------------+
+|                           job id |
++----------------------------------+
+| e68e7fabddfade4f42910980652582dc |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM StreamingTable;
+!output
++----+----+-------------+
+| op | id |         str |
++----+----+-------------+
+| +I |  1 | Hello World |
+| +I |  2 |          Hi |
+| +I |  2 |          Hi |
+| +I |  3 |       Hello |
+| +I |  3 |       World |
+| +I |  4 |         ADD |
+| +I |  5 |        LINE |
++----+----+-------------+
+7 rows in set
+!ok
+
+# ==========================================================================
+# test batch insert
+# ==========================================================================
+
+SET 'execution.runtime-mode' = 'batch';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create table BatchTable (
+  id int,
+  str string
+) with (
+  'connector' = 'filesystem',
+  'path' = '$VAR_BATCH_PATH',
+  'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = '29ba2263b9b86bd8a14b91487941bfe7';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+!output
++----------------------------------+
+|                           job id |
++----------------------------------+
+| 29ba2263b9b86bd8a14b91487941bfe7 |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM BatchTable;
+!output
++----+-------------+
+| id |         str |
++----+-------------+
+|  1 | Hello World |
+|  2 |          Hi |
+|  2 |          Hi |
+|  3 |       Hello |
+|  3 |       World |
+|  4 |         ADD |
+|  5 |        LINE |
++----+-------------+
+7 rows in set
+!ok
diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
similarity index 73%
copy from flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
copy to flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index 3f5d19f043b..9f95973c831 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -1,19 +1,3 @@
-# table.q - EXECUTE/EXPLAIN STATEMENT SET BEGIN ... END
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
 # statement-set.q - BEGIN STATEMENT SET, END
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
@@ -31,15 +15,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SET 'sql-client.execution.result-mode' = 'tableau';
-!output
-[INFO] Session property has been set.
-!info
-
 SET 'table.dml-sync' = 'true';
 !output
-[INFO] Session property has been set.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 create table src (
   id int,
@@ -48,8 +32,13 @@ create table src (
   'connector' = 'values'
 );
 !output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 # ==========================================================================
 # test statement set with streaming insert
@@ -57,8 +46,13 @@ create table src (
 
 SET 'execution.runtime-mode' = 'streaming';
 !output
-[INFO] Session property has been set.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 create table StreamingTable (
   id int,
@@ -69,8 +63,14 @@ create table StreamingTable (
   'format' = 'csv'
 );
 !output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
 
 create table StreamingTable2 (
   id int,
@@ -81,8 +81,13 @@ create table StreamingTable2 (
   'format' = 'csv'
 );
 !output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 EXPLAIN STATEMENT SET BEGIN
 INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
@@ -113,7 +118,17 @@ Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX
 
 Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
 +- Reused(reference_id=[1])
+!ok
 
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = 'a5513ca0a886c6c9bafaf3acac43bfa5';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
 !ok
 
 EXECUTE STATEMENT SET BEGIN
@@ -121,63 +136,72 @@ INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'),
 INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 END;
 !output
-[INFO] Submitting SQL update statement to the cluster...
-[INFO] Execute statement in sync mode. Please wait for the execution finish...
-[INFO] Complete execution of the SQL update statement.
-!info
++----------------------------------+
+|                           job id |
++----------------------------------+
+| a5513ca0a886c6c9bafaf3acac43bfa5 |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 SELECT * FROM StreamingTable;
 !output
-+----+-------------+--------------------------------+
-| op |          id |                            str |
-+----+-------------+--------------------------------+
-| +I |           1 |                    Hello World |
-| +I |           2 |                             Hi |
-| +I |           2 |                             Hi |
-| +I |           3 |                          Hello |
-| +I |           3 |                          World |
-| +I |           4 |                            ADD |
-| +I |           5 |                           LINE |
-+----+-------------+--------------------------------+
-Received a total of 7 rows
++----+----+-------------+
+| op | id |         str |
++----+----+-------------+
+| +I |  1 | Hello World |
+| +I |  2 |          Hi |
+| +I |  2 |          Hi |
+| +I |  3 |       Hello |
+| +I |  3 |       World |
+| +I |  4 |         ADD |
+| +I |  5 |        LINE |
++----+----+-------------+
+7 rows in set
 !ok
 
 SELECT * FROM StreamingTable2;
 !output
-+----+-------------+--------------------------------+
-| op |          id |                            str |
-+----+-------------+--------------------------------+
-| +I |           1 |                    Hello World |
-| +I |           2 |                             Hi |
-| +I |           2 |                             Hi |
-| +I |           3 |                          Hello |
-| +I |           3 |                          World |
-| +I |           4 |                            ADD |
-| +I |           5 |                           LINE |
-+----+-------------+--------------------------------+
-Received a total of 7 rows
++----+----+-------------+
+| op | id |         str |
++----+----+-------------+
+| +I |  1 | Hello World |
+| +I |  2 |          Hi |
+| +I |  2 |          Hi |
+| +I |  3 |       Hello |
+| +I |  3 |       World |
+| +I |  4 |         ADD |
+| +I |  5 |        LINE |
++----+----+-------------+
+7 rows in set
 !ok
 
 EXPLAIN STATEMENT SET BEGIN
 END;
 !output
-[ERROR] Could not execute SQL statement. Reason:
 org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
 Was expecting one of:
     "INSERT" ...
     "UPSERT" ...
-
 !error
 
 EXECUTE STATEMENT SET BEGIN
 END;
 !output
-[ERROR] Could not execute SQL statement. Reason:
 org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
 Was expecting one of:
     "INSERT" ...
     "UPSERT" ...
-
 !error
 
 # ==========================================================================
@@ -186,8 +210,13 @@ Was expecting one of:
 
 SET 'execution.runtime-mode' = 'batch';
 !output
-[INFO] Session property has been set.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 create table BatchTable (
 id int,
@@ -198,8 +227,14 @@ str string
 'format' = 'csv'
 );
 !output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
 
 create table BatchTable2 (
 id int,
@@ -210,8 +245,13 @@ str string
 'format' = 'csv'
 );
 !output
-[INFO] Execute statement succeed.
-!info
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 EXPLAIN STATEMENT SET
 BEGIN
@@ -243,7 +283,17 @@ Sink(table=[default_catalog.default_database.BatchTable], fields=[EXPR$0, EXPR$1
 
 Sink(table=[default_catalog.default_database.BatchTable2], fields=[EXPR$0, EXPR$1])
 +- Reused(reference_id=[1])
+!ok
 
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = '2e2dc0a5a6315296062ba81eba340668';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
 !ok
 
 EXECUTE STATEMENT SET
@@ -252,10 +302,23 @@ INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2,
 INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 END;
 !output
-[INFO] Submitting SQL update statement to the cluster...
-[INFO] Execute statement in sync mode. Please wait for the execution finish...
-[INFO] Complete execution of the SQL update statement.
-!info
++----------------------------------+
+|                           job id |
++----------------------------------+
+| 2e2dc0a5a6315296062ba81eba340668 |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 SELECT * FROM BatchTable;
 !output
@@ -288,21 +351,17 @@ Empty set
 EXPLAIN STATEMENT SET BEGIN
 END;
 !output
-[ERROR] Could not execute SQL statement. Reason:
 org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
 Was expecting one of:
     "INSERT" ...
     "UPSERT" ...
-
 !error
 
 EXECUTE STATEMENT SET BEGIN
 END;
 !output
-[ERROR] Could not execute SQL statement. Reason:
 org.apache.flink.sql.parser.impl.ParseException: Encountered "END" at line 2, column 1.
 Was expecting one of:
     "INSERT" ...
     "UPSERT" ...
-
 !error
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
new file mode 100644
index 00000000000..821f679fa36
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
@@ -0,0 +1,988 @@
+# table.q - CREATE/DROP/SHOW/ALTER/DESCRIBE TABLE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==========================================================================
+# validation test
+# ==========================================================================
+
+create table tbl(a int, b as invalid_function());
+!output
+org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature invalid_function()
+!error
+
+drop table non_exist;
+!output
+org.apache.flink.table.api.ValidationException: Table with identifier 'default_catalog.default_database.non_exist' does not exist.
+!error
+
+describe non_exist;
+!output
+org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
+!error
+
+desc non_exist;
+!output
+org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
+!error
+
+alter table non_exist rename to non_exist2;
+!output
+org.apache.flink.table.api.ValidationException: Table `default_catalog`.`default_database`.`non_exist` doesn't exist or is a temporary table.
+!error
+
+# ==========================================================================
+# test create table
+# ==========================================================================
+
+# test create a table with computed column, primary key, watermark
+CREATE TABLE IF NOT EXISTS orders (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ ptime AS PROCTIME(),
+ PRIMARY KEY(`user`) NOT ENFORCED,
+ WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test SHOW TABLES
+show tables;
+!output
++------------+
+| table name |
++------------+
+|     orders |
++------------+
+1 row in set
+!ok
+
+# test SHOW CREATE TABLE
+show create table orders;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen'
+)
+!ok
+
+# test SHOW COLUMNS
+show columns from orders;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    name |                        type |  null |       key |        extras |                  watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    user |                      BIGINT | FALSE | PRI(user) |               |                            |
+| product |                 VARCHAR(32) |  TRUE |           |               |                            |
+|  amount |                         INT |  TRUE |           |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |           |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |           | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns in orders;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    name |                        type |  null |       key |        extras |                  watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    user |                      BIGINT | FALSE | PRI(user) |               |                            |
+| product |                 VARCHAR(32) |  TRUE |           |               |                            |
+|  amount |                         INT |  TRUE |           |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |           |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |           | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns from orders like '%u';
+!output
+Empty set
+!ok
+
+show columns in orders like '%u';
+!output
+Empty set
+!ok
+
+show columns from orders not like '%u';
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    name |                        type |  null |       key |        extras |                  watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    user |                      BIGINT | FALSE | PRI(user) |               |                            |
+| product |                 VARCHAR(32) |  TRUE |           |               |                            |
+|  amount |                         INT |  TRUE |           |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |           |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |           | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns in orders not like '%u';
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    name |                        type |  null |       key |        extras |                  watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    user |                      BIGINT | FALSE | PRI(user) |               |                            |
+| product |                 VARCHAR(32) |  TRUE |           |               |                            |
+|  amount |                         INT |  TRUE |           |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |           |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |           | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+show columns from orders like '%r';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name |   type |  null |       key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) |        |           |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns in orders like '%r';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name |   type |  null |       key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) |        |           |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns from orders not like  '%r';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+|    name |                        type |  null | key |        extras |                  watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product |                 VARCHAR(32) |  TRUE |     |               |                            |
+|  amount |                         INT |  TRUE |     |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+show columns in orders not like  '%r';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+|    name |                        type |  null | key |        extras |                  watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product |                 VARCHAR(32) |  TRUE |     |               |                            |
+|  amount |                         INT |  TRUE |     |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+show columns from orders like '%u%';
+!output
++---------+-------------+-------+-----------+--------+-----------+
+|    name |        type |  null |       key | extras | watermark |
++---------+-------------+-------+-----------+--------+-----------+
+|    user |      BIGINT | FALSE | PRI(user) |        |           |
+| product | VARCHAR(32) |  TRUE |           |        |           |
+|  amount |         INT |  TRUE |           |        |           |
++---------+-------------+-------+-----------+--------+-----------+
+3 rows in set
+!ok
+
+show columns in orders like '%u%';
+!output
++---------+-------------+-------+-----------+--------+-----------+
+|    name |        type |  null |       key | extras | watermark |
++---------+-------------+-------+-----------+--------+-----------+
+|    user |      BIGINT | FALSE | PRI(user) |        |           |
+| product | VARCHAR(32) |  TRUE |           |        |           |
+|  amount |         INT |  TRUE |           |        |           |
++---------+-------------+-------+-----------+--------+-----------+
+3 rows in set
+!ok
+
+show columns from orders not like '%u%';
+!output
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+|  name |                        type |  null | key |        extras |                  watermark |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+|    ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |               | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |                            |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+2 rows in set
+!ok
+
+show columns in orders not like '%u%';
+!output
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+|  name |                        type |  null | key |        extras |                  watermark |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+|    ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |               | `ts` - INTERVAL '1' SECOND |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |                            |
++-------+-----------------------------+-------+-----+---------------+----------------------------+
+2 rows in set
+!ok
+
+show columns from orders like 'use_';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name |   type |  null |       key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) |        |           |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns in orders like 'use_';
+!output
++------+--------+-------+-----------+--------+-----------+
+| name |   type |  null |       key | extras | watermark |
++------+--------+-------+-----------+--------+-----------+
+| user | BIGINT | FALSE | PRI(user) |        |           |
++------+--------+-------+-----------+--------+-----------+
+1 row in set
+!ok
+
+show columns from orders not like 'use_';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+|    name |                        type |  null | key |        extras |                  watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product |                 VARCHAR(32) |  TRUE |     |               |                            |
+|  amount |                         INT |  TRUE |     |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+show columns in orders not like 'use_';
+!output
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+|    name |                        type |  null | key |        extras |                  watermark |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+| product |                 VARCHAR(32) |  TRUE |     |               |                            |
+|  amount |                         INT |  TRUE |     |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----+---------------+----------------------------+
+4 rows in set
+!ok
+
+# ==========================================================================
+# test alter table rename
+# ==========================================================================
+
+alter table orders rename to orders2;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test alter table set
+# ==========================================================================
+
+# test alter table properties
+alter table orders2 set ('connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'kafka',
+  'scan.startup.mode' = 'earliest-offset'
+)
+!ok
+
+# change connector to 'datagen' without removing 'scan.startup.mode' for the fix later
+alter table orders2 set ('connector' = 'datagen');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# verify table options are problematic
+show create table orders2;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen',
+  'scan.startup.mode' = 'earliest-offset'
+)
+!ok
+
+# test SHOW CREATE VIEW for tables
+show create view orders2;
+!output
+org.apache.flink.table.api.TableException: SHOW CREATE VIEW is only supported for views, but `default_catalog`.`default_database`.`orders2` is a table. Please use SHOW CREATE TABLE instead.
+!error
+
+# test explain plan to verify the table source cannot be created
+explain plan for select * from orders2;
+!output
+org.apache.flink.table.api.ValidationException: Unsupported options found for 'datagen'.
+
+Unsupported options:
+
+scan.startup.mode
+
+Supported options:
+
+connector
+fields.amount.kind
+fields.amount.max
+fields.amount.min
+fields.product.kind
+fields.product.length
+fields.ts.kind
+fields.ts.max-past
+fields.user.kind
+fields.user.max
+fields.user.min
+number-of-rows
+rows-per-second
+!error
+
+# ==========================================================================
+# test alter table reset
+# ==========================================================================
+
+# test alter table reset to remove invalid key
+alter table orders2 reset ('scan.startup.mode');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+!output
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen'
+)
+!ok
+
+# test alter table reset emtpy key
+alter table orders2 reset ();
+!output
+org.apache.flink.table.api.ValidationException: ALTER TABLE RESET does not support empty key
+!error
+
+# ==========================================================================
+# test describe table
+# ==========================================================================
+
+describe orders2;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    name |                        type |  null |       key |        extras |                  watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    user |                      BIGINT | FALSE | PRI(user) |               |                            |
+| product |                 VARCHAR(32) |  TRUE |           |               |                            |
+|  amount |                         INT |  TRUE |           |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |           |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |           | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+# test desc table
+desc orders2;
+!output
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    name |                        type |  null |       key |        extras |                  watermark |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+|    user |                      BIGINT | FALSE | PRI(user) |               |                            |
+| product |                 VARCHAR(32) |  TRUE |           |               |                            |
+|  amount |                         INT |  TRUE |           |               |                            |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |           |               | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |           | AS PROCTIME() |                            |
++---------+-----------------------------+-------+-----------+---------------+----------------------------+
+5 rows in set
+!ok
+
+# ==========================================================================
+# test drop table
+# ==========================================================================
+
+drop table orders2;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# verify table is dropped
+show tables;
+!output
+Empty set
+!ok
+
+# ==========================================================================
+# test temporary table
+# ==========================================================================
+
+create temporary table tbl1 (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# TODO: warning users the table already exists
+create temporary table if not exists tbl1 (
+   `user` BIGINT NOT NULl,
+   product VARCHAR(32),
+   amount INT
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# list permanent and temporary tables together
+show tables;
+!output
++------------+
+| table name |
++------------+
+|       tbl1 |
++------------+
+1 row in set
+!ok
+
+# SHOW CREATE TABLE for temporary table
+show create table tbl1;
+!output
+CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`tbl1` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount` INT
+) WITH (
+  'connector' = 'datagen'
+)
+!ok
+
+drop temporary table tbl1;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test playing with keyword identifiers
+# ==========================================================================
+
+create table `mod` (`table` string, `database` string) with ('connector' = 'values');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+describe `mod`;
+!output
++----------+--------+------+-----+--------+-----------+
+|     name |   type | null | key | extras | watermark |
++----------+--------+------+-----+--------+-----------+
+|    table | STRING | TRUE |     |        |           |
+| database | STRING | TRUE |     |        |           |
++----------+--------+------+-----+--------+-----------+
+2 rows in set
+!ok
+
+desc `mod`;
+!output
++----------+--------+------+-----+--------+-----------+
+|     name |   type | null | key | extras | watermark |
++----------+--------+------+-----+--------+-----------+
+|    table | STRING | TRUE |     |        |           |
+| database | STRING | TRUE |     |        |           |
++----------+--------+------+-----+--------+-----------+
+2 rows in set
+!ok
+
+drop table `mod`;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+# ==========================================================================
+# test explain
+# ==========================================================================
+
+CREATE TABLE IF NOT EXISTS orders (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ ptime AS PROCTIME(),
+ PRIMARY KEY(`user`) NOT ENFORCED,
+ WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+CREATE TABLE IF NOT EXISTS orders2 (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ PRIMARY KEY(`user`) NOT ENFORCED
+) with (
+ 'connector' = 'blackhole'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test explain plan for select
+explain plan for select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain plan for insert
+explain plan for insert into orders2 select `user`, product, amount, ts from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+   +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+      +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain select
+explain select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain insert
+explain insert into orders2 select `user`, product, amount, ts from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+   +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+      +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain insert with json format
+explain json_execution_plan insert into orders2 select `user`, product, amount, ts from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+   +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+      +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: orders[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "WatermarkAssigner[]",
+    "pact" : "Operator",
+    "contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "ConstraintEnforcer[]",
+    "pact" : "Operator",
+    "contents" : "[]:ConstraintEnforcer[NotNullEnforcer(fields=[user])]",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: orders2[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
+!ok
+
+# test explain select with json format
+explain json_execution_plan select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: orders[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[user, product, ts])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "WatermarkAssigner[]",
+    "pact" : "Operator",
+    "contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[user, product])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
+!ok
+
+# test explain select with ESTIMATED_COST
+explain estimated_cost select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+   +- Calc(select=[user, product, ts]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain select with CHANGELOG_MODE
+explain changelog_mode select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product], changelogMode=[I])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I])
+   +- Calc(select=[user, product, ts], changelogMode=[I])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+!ok
+
+# test explain select with all details
+explain changelog_mode, estimated_cost, json_execution_plan select `user`, product from orders;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+   +- Calc(select=[user, product, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: orders[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[user, product, ts])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "WatermarkAssigner[]",
+    "pact" : "Operator",
+    "contents" : "[]:WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[user, product])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
+!ok
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
new file mode 100644
index 00000000000..dc20a1cb70f
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
@@ -0,0 +1,503 @@
+# view.q - CREATE/DROP/SHOW/DESCRIBE VIEW
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# register a base table first
+CREATE TABLE orders (
+ `user` BIGINT NOT NULl,
+ product VARCHAR(32),
+ amount INT,
+ ts TIMESTAMP(3),
+ ptime AS PROCTIME(),
+ PRIMARY KEY(`user`) NOT ENFORCED,
+ WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+) with (
+ 'connector' = 'datagen'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# ==== test temporary view =====
+
+create temporary view v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create temporary view v1 as select * from orders;
+!output
+org.apache.flink.table.api.ValidationException: Temporary table '`default_catalog`.`default_database`.`v1`' already exists
+!error
+
+# TODO: warning users the view already exists
+create temporary view if not exists v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test query a view with hint
+select * from v1 /*+ OPTIONS('number-of-rows' = '1') */;
+!output
+org.apache.flink.table.api.ValidationException: View '`default_catalog`.`default_database`.`v1`' cannot be enriched with new options. Hints can only be applied to tables.
+!error
+
+# test create a view reference another view
+create temporary view if not exists v2 as select * from v1;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test show create a temporary view
+show create view v1;
+!output
+CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1`(`user`, `product`, `amount`, `ts`, `ptime`) as
+SELECT *
+FROM `default_catalog`.`default_database`.`orders`
+!ok
+
+# test show create a temporary view reference another view
+show create view v2;
+!output
+CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2`(`user`, `product`, `amount`, `ts`, `ptime`) as
+SELECT *
+FROM `default_catalog`.`default_database`.`v1`
+!ok
+
+show tables;
+!output
++------------+
+| table name |
++------------+
+|     orders |
+|         v1 |
+|         v2 |
++------------+
+3 rows in set
+!ok
+
+show views;
+!output
++-----------+
+| view name |
++-----------+
+|        v1 |
+|        v2 |
++-----------+
+2 rows in set
+!ok
+
+# test SHOW CREATE TABLE for views
+show create table v1;
+!output
+org.apache.flink.table.api.TableException: SHOW CREATE TABLE is only supported for tables, but `default_catalog`.`default_database`.`v1` is a view. Please use SHOW CREATE VIEW instead.
+!error
+
+# ==== test permanent view =====
+
+# register a permanent view with the duplicate name with temporary view
+create view v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test create duplicate view
+create view v1 as select * from orders;
+!output
+org.apache.flink.table.catalog.exceptions.TableAlreadyExistException: Table (or view) default_database.v1 already exists in Catalog default_catalog.
+!error
+
+# test show create a permanent view
+create view permanent_v1 as select * from orders;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test show create a permanent view
+show create view permanent_v1;
+!output
+CREATE VIEW `default_catalog`.`default_database`.`permanent_v1`(`user`, `product`, `amount`, `ts`, `ptime`) as
+SELECT *
+FROM `default_catalog`.`default_database`.`orders`
+!ok
+
+# remove permanent_v1 view
+drop view permanent_v1;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# we didn't distinguish the temporary v1 and permanent v1 for now
+show views;
+!output
++-----------+
+| view name |
++-----------+
+|        v1 |
+|        v2 |
++-----------+
+2 rows in set
+!ok
+
+# test describe view
+describe v1;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+# test SHOW COLUMNS
+show columns from v1;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns in v1;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns from v1 like '%u';
+!output
+Empty set
+!ok
+
+show columns in v1 like '%u';
+!output
+Empty set
+!ok
+
+show columns from v1 not like '%u';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns in v1 not like '%u';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+show columns from v1 like '%r';
+!output
++------+--------+-------+-----+--------+-----------+
+| name |   type |  null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE |     |        |           |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns in v1 like '%r';
+!output
++------+--------+-------+-----+--------+-----------+
+| name |   type |  null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE |     |        |           |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns from v1 not like  '%r';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+show columns in v1 not like  '%r';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+show columns from v1 like '%u%';
+!output
++---------+-------------+-------+-----+--------+-----------+
+|    name |        type |  null | key | extras | watermark |
++---------+-------------+-------+-----+--------+-----------+
+|    user |      BIGINT | FALSE |     |        |           |
+| product | VARCHAR(32) |  TRUE |     |        |           |
+|  amount |         INT |  TRUE |     |        |           |
++---------+-------------+-------+-----+--------+-----------+
+3 rows in set
+!ok
+
+show columns in v1 like '%u%';
+!output
++---------+-------------+-------+-----+--------+-----------+
+|    name |        type |  null | key | extras | watermark |
++---------+-------------+-------+-----+--------+-----------+
+|    user |      BIGINT | FALSE |     |        |           |
+| product | VARCHAR(32) |  TRUE |     |        |           |
+|  amount |         INT |  TRUE |     |        |           |
++---------+-------------+-------+-----+--------+-----------+
+3 rows in set
+!ok
+
+show columns from v1 not like '%u%';
+!output
++-------+-----------------------------+-------+-----+--------+-----------+
+|  name |                        type |  null | key | extras | watermark |
++-------+-----------------------------+-------+-----+--------+-----------+
+|    ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++-------+-----------------------------+-------+-----+--------+-----------+
+2 rows in set
+!ok
+
+show columns in v1 not like '%u%';
+!output
++-------+-----------------------------+-------+-----+--------+-----------+
+|  name |                        type |  null | key | extras | watermark |
++-------+-----------------------------+-------+-----+--------+-----------+
+|    ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++-------+-----------------------------+-------+-----+--------+-----------+
+2 rows in set
+!ok
+
+show columns from v1 like 'use_';
+!output
++------+--------+-------+-----+--------+-----------+
+| name |   type |  null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE |     |        |           |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns in v1 like 'use_';
+!output
++------+--------+-------+-----+--------+-----------+
+| name |   type |  null | key | extras | watermark |
++------+--------+-------+-----+--------+-----------+
+| user | BIGINT | FALSE |     |        |           |
++------+--------+-------+-----+--------+-----------+
+1 row in set
+!ok
+
+show columns from v1 not like 'use_';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+show columns in v1 not like 'use_';
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+4 rows in set
+!ok
+
+# we can't drop permanent view if there is temporary view with the same name
+drop view v1;
+!output
+org.apache.flink.table.api.ValidationException: Temporary view with identifier '`default_catalog`.`default_database`.`v1`' exists. Drop it first before removing the permanent view.
+!error
+
+# although temporary v2 needs temporary v1, dropping v1 first does not throw exception
+drop temporary view v1;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# now we can drop permanent view v1
+drop view v1;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+# test drop invalid table
+drop view non_exist;
+!output
+org.apache.flink.table.api.ValidationException: View with identifier 'default_catalog.default_database.non_exist' does not exist.
+!error
+
+# ===== test playing with keyword identifiers =====
+
+create view `mod` as select * from orders;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+describe `mod`;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+desc `mod`;
+!output
++---------+-----------------------------+-------+-----+--------+-----------+
+|    name |                        type |  null | key | extras | watermark |
++---------+-----------------------------+-------+-----+--------+-----------+
+|    user |                      BIGINT | FALSE |     |        |           |
+| product |                 VARCHAR(32) |  TRUE |     |        |           |
+|  amount |                         INT |  TRUE |     |        |           |
+|      ts |      TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
+|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
++---------+-----------------------------+-------+-----+--------+-----------+
+5 rows in set
+!ok
+
+drop view `mod`;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
++------------+
+| table name |
++------------+
+|     orders |
+|         v2 |
++------------+
+2 rows in set
+!ok
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
index 63af5b3f3a3..90b7517f91c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -279,6 +280,12 @@ public final class MiniClusterExtension
                 MiniClusterClient.MiniClusterId.INSTANCE);
     }
 
+    // Utils
+
+    public Configuration getClientConfiguration() {
+        return internalMiniClusterExtension.getClientConfiguration();
+    }
+
     private static class CloseableParameter<T extends AutoCloseable>
             implements ExtensionContext.Store.CloseableResource {
         private final T autoCloseable;