You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/08 02:26:25 UTC

[flink] branch master updated: [FLINK-29732][sql-gateway] Support to configureSession in the SqlGatewayService

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e6215d4f7ed [FLINK-29732][sql-gateway] Support to configureSession in the SqlGatewayService
e6215d4f7ed is described below

commit e6215d4f7ede96f1225921f2b0f322c87a5040cd
Author: yuzelin <74...@qq.com>
AuthorDate: Mon Oct 24 00:16:38 2022 +0800

    [FLINK-29732][sql-gateway] Support to configureSession in the SqlGatewayService
    
    This closes #21133
---
 .../flink/table/gateway/api/SqlGatewayService.java |  14 ++
 .../gateway/api/utils/MockedSqlGatewayService.java |   6 +
 flink-table/flink-sql-gateway/pom.xml              |   7 +
 .../gateway/service/SqlGatewayServiceImpl.java     |  28 ++++
 .../service/operation/OperationExecutor.java       |  79 ++++++++--
 .../service/operation/OperationManager.java        |  26 ++--
 .../gateway/service/SqlGatewayServiceITCase.java   | 115 +++++++++++++++
 .../service/operation/OperationManagerTest.java    | 159 +++++++++++++++++++++
 8 files changed, 412 insertions(+), 22 deletions(-)

diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 9bdc9c751b7..4619a05c194 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -66,6 +66,20 @@ public interface SqlGatewayService {
      */
     void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
 
+    /**
+     * Using the statement to initialize the Session. It's only allowed to execute
+     * SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR.
+     *
+     * <p>It returns until the execution finishes.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param statement the statement used to configure the session.
+     * @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the
+     *     timeout mechanism.
+     */
+    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs)
+            throws SqlGatewayException;
+
     /**
      * Get the current configuration of the {@code Session}.
      *
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index a9c6bd77cc7..97cc0c6cf0d 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -54,6 +54,12 @@ public class MockedSqlGatewayService implements SqlGatewayService {
         throw new UnsupportedOperationException();
     }
 
+    public void configureSession(
+            SessionHandle sessionHandle, String statement, long executionTimeoutMs)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
             throws SqlGatewayException {
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
index 94d48d0f4db..91475a184b6 100644
--- a/flink-table/flink-sql-gateway/pom.xml
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -103,6 +103,13 @@
 			<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
     </dependencies>
 
     <build>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 932a0a0c852..4e2573964b7 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.session.Session;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 
@@ -79,6 +80,31 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         }
     }
 
+    @Override
+    public void configureSession(
+            SessionHandle sessionHandle, String statement, long executionTimeoutMs)
+            throws SqlGatewayException {
+        try {
+            if (executionTimeoutMs > 0) {
+                // TODO: support the feature in FLINK-27838
+                throw new UnsupportedOperationException(
+                        "SqlGatewayService doesn't support timeout mechanism now.");
+            }
+
+            OperationManager operationManager = getSession(sessionHandle).getOperationManager();
+            OperationHandle operationHandle =
+                    operationManager.submitOperation(
+                            handle ->
+                                    getSession(sessionHandle)
+                                            .createExecutor()
+                                            .configureSession(handle, statement));
+            operationManager.awaitOperationTermination(operationHandle);
+        } catch (Throwable t) {
+            LOG.error("Failed to configure session.", t);
+            throw new SqlGatewayException("Failed to configure session.", t);
+        }
+    }
+
     @Override
     public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
             throws SqlGatewayException {
@@ -311,6 +337,8 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         return GatewayInfo.INSTANCE;
     }
 
+    // --------------------------------------------------------------------------------------------
+
     @VisibleForTesting
     public Session getSession(SessionHandle sessionHandle) {
         return sessionManager.getSession(sessionHandle);
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 569e838c3a2..d39e6975ce1 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -44,22 +44,28 @@ import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
+import org.apache.flink.table.operations.UseOperation;
+import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.AlterOperation;
+import org.apache.flink.table.operations.ddl.CreateOperation;
+import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.util.CollectionUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,6 +90,48 @@ public class OperationExecutor {
         this.executionConfig = executionConfig;
     }
 
+    public ResultFetcher configureSession(OperationHandle handle, String statement) {
+        TableEnvironmentInternal tableEnv = getTableEnvironment();
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Configure session only accepts a single SQL statement.");
+        }
+        Operation op = parsedOperations.get(0);
+
+        if (!(op instanceof SetOperation)
+                && !(op instanceof ResetOperation)
+                && !(op instanceof CreateOperation)
+                && !(op instanceof DropOperation)
+                && !(op instanceof UseOperation)
+                && !(op instanceof AlterOperation)
+                && !(op instanceof LoadModuleOperation)
+                && !(op instanceof UnloadModuleOperation)
+                && !(op instanceof AddJarOperation)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported statement for configuring session:%s\n"
+                                    + "The configureSession API only supports to execute statement of type "
+                                    + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+                                    + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+                                    + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+                                    + "CREATE CATALOG, DROP CATALOG, "
+                                    + "USE CATALOG, USE [CATALOG.]DATABASE, "
+                                    + "CREATE VIEW, DROP VIEW, "
+                                    + "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
+                                    + "ADD JAR.",
+                            statement));
+        }
+
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else {
+            return callOperation(tableEnv, handle, op);
+        }
+    }
+
     public ResultFetcher executeStatement(OperationHandle handle, String statement) {
         // Instantiate the TableEnvironment lazily
         TableEnvironmentInternal tableEnv = getTableEnvironment();
@@ -114,9 +162,7 @@ public class OperationExecutor {
             TableResultInternal result = tableEnv.executeInternal(op);
             return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
         } else {
-            TableResultInternal result = tableEnv.executeInternal(op);
-            return new ResultFetcher(
-                    handle, result.getResolvedSchema(), collect(result.collectInternal()));
+            return callOperation(tableEnv, handle, op);
         }
     }
 
@@ -228,7 +274,8 @@ public class OperationExecutor {
             return new ResultFetcher(
                     handle,
                     TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
-                    collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+                    CollectionUtil.iteratorToList(
+                            TableResultInternal.TABLE_RESULT_OK.collectInternal()));
         } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
             // show all properties
             Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
@@ -237,7 +284,7 @@ public class OperationExecutor {
                     ResolvedSchema.of(
                             Column.physical(SET_KEY, DataTypes.STRING()),
                             Column.physical(SET_VALUE, DataTypes.STRING())),
-                    collect(
+                    CollectionUtil.iteratorToList(
                             configMap.entrySet().stream()
                                     .map(
                                             entry ->
@@ -264,7 +311,8 @@ public class OperationExecutor {
         return new ResultFetcher(
                 handle,
                 TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
-                collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+                CollectionUtil.iteratorToList(
+                        TableResultInternal.TABLE_RESULT_OK.collectInternal()));
     }
 
     private ResultFetcher callModifyOperations(
@@ -289,6 +337,15 @@ public class OperationExecutor {
                                                 .toString()))));
     }
 
+    private ResultFetcher callOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
+        TableResultInternal result = tableEnv.executeInternal(op);
+        return new ResultFetcher(
+                handle,
+                result.getResolvedSchema(),
+                CollectionUtil.iteratorToList(result.collectInternal()));
+    }
+
     private Set<TableInfo> listTables(
             String catalogName, String databaseName, boolean includeViews) {
         CatalogManager catalogManager = sessionContext.getSessionState().catalogManager;
@@ -333,10 +390,4 @@ public class OperationExecutor {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
-
-    private List<RowData> collect(Iterator<RowData> tableResult) {
-        List<RowData> rows = new ArrayList<>();
-        tableResult.forEachRemaining(rows::add);
-        return rows;
-    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 6ca0a28f3cb..29ef1569736 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -137,6 +137,10 @@ public class OperationManager {
                 });
     }
 
+    public void awaitOperationTermination(OperationHandle operationHandle) throws Exception {
+        getOperation(operationHandle).awaitTermination();
+    }
+
     /**
      * Get the {@link OperationInfo} of the operation.
      *
@@ -307,15 +311,9 @@ public class OperationManager {
         }
 
         public ResolvedSchema getResultSchema() throws Exception {
-            synchronized (status) {
-                while (!status.get().isTerminalStatus()) {
-                    status.wait();
-                }
-            }
+            awaitTermination();
             OperationStatus current = status.get();
-            if (current == OperationStatus.ERROR) {
-                throw operationError;
-            } else if (current != OperationStatus.FINISHED) {
+            if (current != OperationStatus.FINISHED) {
                 throw new IllegalStateException(
                         String.format(
                                 "The result schema is available when the Operation is in FINISHED state but the current status is %s.",
@@ -328,6 +326,18 @@ public class OperationManager {
             return new OperationInfo(status.get(), operationError);
         }
 
+        public void awaitTermination() throws Exception {
+            synchronized (status) {
+                while (!status.get().isTerminalStatus()) {
+                    status.wait();
+                }
+            }
+            OperationStatus current = status.get();
+            if (current == OperationStatus.ERROR) {
+                throw operationError;
+            }
+        }
+
         private ResultSet fetchResultsInternal(Supplier<ResultSet> results) {
             OperationStatus currentStatus = status.get();
 
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index c8c8513bdb6..996cb74778a 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -20,9 +20,11 @@ package org.apache.flink.table.gateway.service;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -54,6 +56,8 @@ import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import org.apache.flink.table.planner.utils.TableFunc0;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -62,7 +66,9 @@ import org.assertj.core.api.Condition;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -71,6 +77,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.FutureTask;
@@ -84,6 +92,8 @@ import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.OTHER;
 import static org.apache.flink.table.functions.FunctionKind.SCALAR;
 import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
@@ -157,6 +167,83 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         assertThat(tableEnv.listModules()).contains(moduleName);
     }
 
+    @Test
+    public void testConfigureSessionWithLegalStatement(@TempDir java.nio.file.Path tmpDir)
+            throws Exception {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        // SET & RESET
+        service.configureSession(sessionHandle, "SET 'key1' = 'value1';", 0);
+        Map<String, String> config = new HashMap<>();
+        config.put("key1", "value1");
+        assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
+
+        service.configureSession(sessionHandle, "RESET 'key1';", 0);
+        assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", "value1");
+
+        // CREATE & USE & ALTER & DROP
+        service.configureSession(
+                sessionHandle,
+                "CREATE CATALOG mycat with ('type' = 'generic_in_memory', 'default-database' = 'db');",
+                0);
+
+        service.configureSession(sessionHandle, "USE CATALOG mycat;", 0);
+        assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");
+
+        service.configureSession(
+                sessionHandle,
+                "CREATE TABLE db.tbl (score INT) WITH ('connector' = 'datagen');",
+                0);
+
+        Set<TableKind> tableKinds = new HashSet<>();
+        tableKinds.add(TableKind.TABLE);
+        assertThat(service.listTables(sessionHandle, "mycat", "db", tableKinds))
+                .contains(
+                        new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl"), TableKind.TABLE));
+
+        service.configureSession(sessionHandle, "ALTER TABLE db.tbl RENAME TO tbl1;", 0);
+        assertThat(service.listTables(sessionHandle, "mycat", "db", tableKinds))
+                .doesNotContain(
+                        new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl"), TableKind.TABLE))
+                .contains(
+                        new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl1"), TableKind.TABLE));
+
+        service.configureSession(sessionHandle, "USE CATALOG default_catalog;", 0);
+        service.configureSession(sessionHandle, "DROP CATALOG mycat;", 0);
+        assertThat(service.listCatalogs(sessionHandle)).doesNotContain("mycat");
+
+        // LOAD & UNLOAD MODULE
+        validateStatementResult(
+                sessionHandle,
+                "SHOW FULL MODULES",
+                Collections.singletonList(GenericRowData.of(StringData.fromString("core"), true)));
+
+        service.configureSession(sessionHandle, "UNLOAD MODULE core;", 0);
+        validateStatementResult(sessionHandle, "SHOW FULL MODULES", Collections.emptyList());
+
+        service.configureSession(sessionHandle, "LOAD MODULE core;", 0);
+        validateStatementResult(
+                sessionHandle,
+                "SHOW FULL MODULES",
+                Collections.singletonList(GenericRowData.of(StringData.fromString("core"), true)));
+
+        // ADD JAR
+        String udfClassName = GENERATED_LOWER_UDF_CLASS + new Random().nextInt(50);
+        String jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                new File(tmpDir.toUri()),
+                                "test-add-jar.jar",
+                                udfClassName,
+                                String.format(GENERATED_LOWER_UDF_CODE, udfClassName))
+                        .toURI()
+                        .getPath();
+        service.configureSession(sessionHandle, String.format("ADD JAR '%s';", jarPath), 0);
+        validateStatementResult(
+                sessionHandle,
+                "SHOW JARS",
+                Collections.singletonList(GenericRowData.of(StringData.fromString(jarPath))));
+    }
+
     @Test
     public void testFetchResultsInRunning() throws Exception {
         SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
@@ -711,6 +798,23 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
     // Negative tests
     // --------------------------------------------------------------------------------------------
 
+    @Test
+    void testConfigureSessionWithIllegalStatement() {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        assertThatThrownBy(() -> service.configureSession(sessionHandle, "SELECT 1;", 0))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Unsupported statement for configuring session:SELECT 1;\n"
+                                        + "The configureSession API only supports to execute statement of type "
+                                        + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+                                        + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+                                        + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+                                        + "CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, "
+                                        + "CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR."));
+    }
+
     @Test
     public void testFetchResultsFromCanceledOperation() {
         SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
@@ -899,4 +1003,15 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
 
         return sessionHandle;
     }
+
+    private void validateStatementResult(
+            SessionHandle sessionHandle, String statement, List<RowData> expected) {
+        TableEnvironmentInternal tableEnv =
+                service.getSession(sessionHandle).createExecutor().getTableEnvironment();
+        assertThat(
+                        CollectionUtil.iteratorToList(
+                                ((TableResultInternal) tableEnv.executeSql(statement))
+                                        .collectInternal()))
+                .isEqualTo(expected);
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
new file mode 100644
index 00000000000..b0a2b5a6218
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link OperationManager}. */
+public class OperationManagerTest {
+
+    private static final ExecutorService EXECUTOR_SERVICE =
+            ThreadUtils.newThreadPool(5, 500, 60_0000, "operation-manager-test");
+
+    private static OperationManager operationManager;
+    private static ResultSet defaultResultSet;
+
+    private final ThreadFactory threadFactory =
+            new ExecutorThreadFactory(
+                    "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);
+
+    @BeforeAll
+    public static void setUp() {
+        operationManager = new OperationManager(EXECUTOR_SERVICE);
+        defaultResultSet =
+                new ResultSet(
+                        PAYLOAD,
+                        1L,
+                        ResolvedSchema.of(Column.physical("id", DataTypes.BIGINT())),
+                        Collections.singletonList(GenericRowData.of(1L)));
+    }
+
+    @AfterAll
+    public static void cleanUp() {
+        EXECUTOR_SERVICE.shutdown();
+        operationManager.close();
+    }
+
+    @Test
+    public void testRunOperationAsynchronously() throws Exception {
+        OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
+
+        assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+                .isNotEqualTo(OperationStatus.ERROR);
+
+        assertThat(operationManager.getOperationResultSchema(operationHandle))
+                .isEqualTo(ResolvedSchema.of(Column.physical("id", DataTypes.BIGINT())));
+
+        assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+                .isEqualTo(OperationStatus.FINISHED);
+    }
+
+    @Test
+    public void testRunOperationSynchronously() throws Exception {
+        OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
+        operationManager.awaitOperationTermination(operationHandle);
+
+        assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+                .isEqualTo(OperationStatus.FINISHED);
+
+        assertThat(operationManager.fetchResults(operationHandle, 0, Integer.MAX_VALUE))
+                .isEqualTo(defaultResultSet);
+    }
+
+    @Test
+    public void testCancelOperation() throws Exception {
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                operationManager.submitOperation(
+                        () -> {
+                            endRunningLatch.await();
+                            return defaultResultSet;
+                        });
+
+        threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start();
+        operationManager.awaitOperationTermination(operationHandle);
+
+        assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+                .isEqualTo(OperationStatus.CANCELED);
+    }
+
+    @Test
+    public void testCloseOperation() throws Exception {
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                operationManager.submitOperation(
+                        () -> {
+                            endRunningLatch.await();
+                            return defaultResultSet;
+                        });
+
+        threadFactory.newThread(() -> operationManager.closeOperation(operationHandle)).start();
+        operationManager.awaitOperationTermination(operationHandle);
+
+        assertThatThrownBy(() -> operationManager.getOperation(operationHandle))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlGatewayException.class,
+                                String.format(
+                                        "Can not find the submitted operation in the OperationManager with the %s.",
+                                        operationHandle)));
+    }
+
+    @Test
+    public void testRunOperationSynchronouslyWithError() {
+        OperationHandle operationHandle =
+                operationManager.submitOperation(
+                        () -> {
+                            throw new SqlExecutionException("Execution error.");
+                        });
+
+        assertThatThrownBy(() -> operationManager.awaitOperationTermination(operationHandle))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlExecutionException.class, "Execution error."));
+
+        assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
+                .isEqualTo(OperationStatus.ERROR);
+    }
+}