You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/06/23 01:37:30 UTC

[flink] 01/02: [FLINK-27766][sql-gateway] Introduce the basic components for the SqlGatewayService

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da08267c2cb29d98e626a867f8e07cb5b8e7f29a
Author: Shengkai <10...@qq.com>
AuthorDate: Wed Jun 22 19:43:54 2022 +0800

    [FLINK-27766][sql-gateway] Introduce the basic components for the SqlGatewayService
---
 flink-table/flink-sql-gateway-api/pom.xml          |  67 +++++
 .../flink/table/gateway/api/HandleIdentifier.java  |  63 +++++
 .../flink/table/gateway/api/SqlGatewayService.java |  58 +++++
 .../api/config/SqlGatewayServiceConfigOptions.java |  76 ++++++
 .../gateway/api/endpoint/EndpointVersion.java      |  25 ++
 .../gateway/api/session/SessionEnvironment.java    | 122 +++++++++
 .../table/gateway/api/session/SessionHandle.java   |  66 +++++
 .../gateway/api/utils/SqlGatewayException.java     |  39 +++
 .../api/session/SessionEnvironmentTest.java        |  52 ++++
 .../gateway/api/utils/MockedEndpointVersion.java   |  26 ++
 flink-table/flink-sql-gateway/pom.xml              | 103 ++++++++
 .../gateway/service/SqlGatewayServiceImpl.java     |  80 ++++++
 .../gateway/service/context/DefaultContext.java    | 152 +++++++++++
 .../gateway/service/context/SessionContext.java    | 286 +++++++++++++++++++++
 .../table/gateway/service/session/Session.java     |  60 +++++
 .../gateway/service/session/SessionManager.java    | 214 +++++++++++++++
 .../service/utils/SqlExecutionException.java       |  33 +++
 .../table/gateway/service/utils/ThreadUtils.java   |  49 ++++
 .../gateway/service/SqlGatewayServiceITCase.java   |  73 ++++++
 .../service/session/SessionManagerTest.java        | 108 ++++++++
 .../service/utils/SqlGatewayServiceExtension.java  |  89 +++++++
 flink-table/pom.xml                                |   2 +
 tools/ci/stage.sh                                  |   2 +
 23 files changed, 1845 insertions(+)

diff --git a/flink-table/flink-sql-gateway-api/pom.xml b/flink-table/flink-sql-gateway-api/pom.xml
new file mode 100644
index 00000000000..e0639d215e8
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table</artifactId>
+        <version>1.16-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-sql-gateway-api</artifactId>
+    <name>Flink : Table : SQL Gateway : API</name>
+    <description>
+        This module contains extension points of the SQL Gateway API.
+        It allows for implementing user-defined endpoint with
+        minimal dependencies.
+    </description>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <!-- Table ecosystem -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Make test classes available to other modules. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java
new file mode 100644
index 00000000000..9b9acbfb72c
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Identifiers for Handle. */
+@PublicEvolving
+public class HandleIdentifier {
+
+    private final UUID publicId;
+    private final UUID secretId;
+
+    public HandleIdentifier(UUID publicId, UUID secretId) {
+        this.publicId = publicId;
+        this.secretId = secretId;
+    }
+
+    public UUID getPublicId() {
+        return publicId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof HandleIdentifier)) {
+            return false;
+        }
+        HandleIdentifier that = (HandleIdentifier) o;
+        return Objects.equals(publicId, that.publicId) && Objects.equals(secretId, that.secretId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(publicId, secretId);
+    }
+
+    @Override
+    public String toString() {
+        return publicId.toString();
+    }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
new file mode 100644
index 00000000000..933c797f243
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+
+import java.util.Map;
+
+/** A service of SQL gateway is responsible for handling requests from the endpoints. */
+@PublicEvolving
+public interface SqlGatewayService {
+
+    // -------------------------------------------------------------------------------------------
+    // Session Management
+    // -------------------------------------------------------------------------------------------
+
+    /**
+     * Open the {@code Session}.
+     *
+     * @param environment Environment to initialize the Session.
+     * @return Returns a handle that used to identify the Session.
+     */
+    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
+
+    /**
+     * Close the {@code Session}.
+     *
+     * @param sessionHandle handle to identify the Session needs to be closed.
+     */
+    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
+
+    /**
+     * Get the current configuration of the {@code Session}.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @return Returns configuration of the session.
+     */
+    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java
new file mode 100644
index 00000000000..eadfb85a7f1
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options of the {@link SqlGatewayService}. */
+@PublicEvolving
+public class SqlGatewayServiceConfigOptions {
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
+            key("sql-gateway.session.idle-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription(
+                            "Timeout interval for closing the session when the session hasn't been accessed during the interval. "
+                                    + "If setting to zero or negative value, the session will not be closed.");
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL =
+            key("sql-gateway.session.check-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription(
+                            "The check interval for idle session timeout, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM =
+            key("sql-gateway.session.max-num")
+                    .intType()
+                    .defaultValue(1000000)
+                    .withDescription(
+                            "The maximum number of the active session for sql gateway service.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX =
+            key("sql-gateway.worker.threads.max")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription(
+                            "The maximum number of worker threads for sql gateway service.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MIN =
+            key("sql-gateway.worker.threads.min")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The minimum number of worker threads for sql gateway service.");
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_WORKER_KEEPALIVE_TIME =
+            key("sql-gateway.worker.keepalive-time")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Keepalive time for an idle worker thread. When the number of workers exceeds min workers, "
+                                    + "excessive threads are killed after this time interval.");
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/endpoint/EndpointVersion.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/endpoint/EndpointVersion.java
new file mode 100644
index 00000000000..7aa3c586c5b
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/endpoint/EndpointVersion.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.endpoint;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Version of the endpoint. */
+@PublicEvolving
+public interface EndpointVersion {}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
new file mode 100644
index 00000000000..75f573aae2b
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Environment to initialize the {@code Session}. */
+@PublicEvolving
+public class SessionEnvironment {
+    private final @Nullable String sessionName;
+    private final EndpointVersion version;
+    private final Map<String, String> sessionConfig;
+
+    @VisibleForTesting
+    SessionEnvironment(
+            @Nullable String sessionName,
+            EndpointVersion version,
+            Map<String, String> sessionConfig) {
+        this.sessionName = sessionName;
+        this.version = version;
+        this.sessionConfig = sessionConfig;
+    }
+
+    // -------------------------------------------------------------------------------------------
+    // Getter
+    // -------------------------------------------------------------------------------------------
+
+    public Optional<String> getSessionName() {
+        return Optional.ofNullable(sessionName);
+    }
+
+    public EndpointVersion getSessionEndpointVersion() {
+        return version;
+    }
+
+    public Map<String, String> getSessionConfig() {
+        return Collections.unmodifiableMap(sessionConfig);
+    }
+
+    // -------------------------------------------------------------------------------------------
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof SessionEnvironment)) {
+            return false;
+        }
+        SessionEnvironment that = (SessionEnvironment) o;
+        return Objects.equals(sessionName, that.sessionName)
+                && Objects.equals(version, that.version)
+                && Objects.equals(sessionConfig, that.sessionConfig);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sessionName, version, sessionConfig);
+    }
+
+    // -------------------------------------------------------------------------------------------
+    // Builder
+    // -------------------------------------------------------------------------------------------
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder to build the {@link SessionEnvironment}. */
+    public static class Builder {
+        private @Nullable String sessionName;
+        private EndpointVersion version;
+        private final Map<String, String> sessionConfig = new HashMap<>();
+
+        public Builder setSessionName(String sessionName) {
+            this.sessionName = sessionName;
+            return this;
+        }
+
+        public Builder setSessionEndpointVersion(EndpointVersion version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder addSessionConfig(Map<String, String> sessionConfig) {
+            this.sessionConfig.putAll(sessionConfig);
+            return this;
+        }
+
+        public SessionEnvironment build() {
+            return new SessionEnvironment(sessionName, checkNotNull(version), sessionConfig);
+        }
+    }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
new file mode 100644
index 00000000000..4a5a2623044
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.HandleIdentifier;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Session Handle that used to identify the Session. */
+@PublicEvolving
+public class SessionHandle {
+
+    private final HandleIdentifier identifier;
+
+    public static SessionHandle create() {
+        return new SessionHandle(new HandleIdentifier(UUID.randomUUID(), UUID.randomUUID()));
+    }
+
+    public SessionHandle(HandleIdentifier identifier) {
+        this.identifier = identifier;
+    }
+
+    public HandleIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof SessionHandle)) {
+            return false;
+        }
+        SessionHandle that = (SessionHandle) o;
+        return Objects.equals(identifier, that.identifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(identifier);
+    }
+
+    @Override
+    public String toString() {
+        return identifier.toString();
+    }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/SqlGatewayException.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/SqlGatewayException.java
new file mode 100644
index 00000000000..20fc43417b7
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/SqlGatewayException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** General exception for SQL gateway related errors. */
+@PublicEvolving
+public class SqlGatewayException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public SqlGatewayException(String message) {
+        super(message);
+    }
+
+    public SqlGatewayException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    public SqlGatewayException(Throwable e) {
+        super(e);
+    }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
new file mode 100644
index 00000000000..fe472a885ed
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.session;
+
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test {@link SessionEnvironment}. */
+public class SessionEnvironmentTest {
+
+    @Test
+    public void testBuildSessionEnvironment() {
+        String sessionName = "test";
+
+        Map<String, String> configMap = new HashMap<>();
+        configMap.put("key1", "value1");
+        configMap.put("key2", "value2");
+
+        SessionEnvironment expectedEnvironment =
+                new SessionEnvironment(sessionName, MockedEndpointVersion.V1, configMap);
+
+        SessionEnvironment actualEnvironment =
+                SessionEnvironment.newBuilder()
+                        .setSessionName(sessionName)
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(configMap)
+                        .build();
+        assertEquals(expectedEnvironment, actualEnvironment);
+    }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedEndpointVersion.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedEndpointVersion.java
new file mode 100644
index 00000000000..d01c079e0d5
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedEndpointVersion.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.utils;
+
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+
+/** Mocked {@link EndpointVersion} for test. */
+public enum MockedEndpointVersion implements EndpointVersion {
+    V1;
+}
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
new file mode 100644
index 00000000000..9c217e71e17
--- /dev/null
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.16-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-sql-gateway</artifactId>
+    <name>Flink : Table : SQL Gateway</name>
+
+    <dependencies>
+        <!-- Flink client to submit jobs -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Table ecosystem -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-gateway-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-gateway-api</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+    <plugins>
+    <!-- Build flink-sql-gateway jar -->
+    <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+            <!-- Exclude all flink-dist files and only contain sql-gateway files -->
+            <execution>
+                <id>shade-flink</id>
+                <phase>package</phase>
+                <goals>
+                    <goal>shade</goal>
+                </goals>
+                <configuration>
+                    <artifactSet>
+                        <includes combine.children="append">
+                            <include>org.apache.flink:flink-sql-gateway-api</include>
+                        </includes>
+                    </artifactSet>
+                </configuration>
+            </execution>
+        </executions>
+    </plugin>
+    </plugins>
+    </build>
+</project>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
new file mode 100644
index 00000000000..2606f8f7763
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.session.Session;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** The implementation of the {@link SqlGatewayService} interface. */
+public class SqlGatewayServiceImpl implements SqlGatewayService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqlGatewayServiceImpl.class);
+
+    private final SessionManager sessionManager;
+
+    public SqlGatewayServiceImpl(SessionManager sessionManager) {
+        this.sessionManager = sessionManager;
+    }
+
+    @Override
+    public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException {
+        try {
+            return sessionManager.openSession(environment).getSessionHandle();
+        } catch (Throwable e) {
+            LOG.error("Failed to openSession.", e);
+            throw new SqlGatewayException("Failed to openSession.", e);
+        }
+    }
+
+    @Override
+    public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException {
+        try {
+            sessionManager.closeSession(sessionHandle);
+        } catch (Throwable e) {
+            LOG.error("Failed to closeSession.", e);
+            throw new SqlGatewayException("Failed to closeSession.", e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
+            throws SqlGatewayException {
+        try {
+            return getSession(sessionHandle).getSessionConfig();
+        } catch (Throwable e) {
+            LOG.error("Failed to getSessionConfig.", e);
+            throw new SqlGatewayException("Failed to getSessionConfig.", e);
+        }
+    }
+
+    @VisibleForTesting
+    Session getSession(SessionHandle sessionHandle) {
+        return sessionManager.getSession(sessionHandle);
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
new file mode 100644
index 00000000000..1ac63d8bedd
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The context memorized initial configuration. */
+public class DefaultContext {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class);
+
+    private final Configuration flinkConfig;
+
+    public DefaultContext(Configuration flinkConfig, List<CustomCommandLine> commandLines) {
+        this.flinkConfig = flinkConfig;
+        // initialize default file system
+        FileSystem.initialize(
+                flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+        Options commandLineOptions = collectCommandLineOptions(commandLines);
+        try {
+            CommandLine deploymentCommandLine =
+                    CliFrontendParser.parse(commandLineOptions, new String[] {}, true);
+            flinkConfig.addAll(
+                    createExecutionConfig(
+                            deploymentCommandLine,
+                            commandLineOptions,
+                            commandLines,
+                            Collections.emptyList()));
+        } catch (Exception e) {
+            throw new SqlGatewayException(
+                    "Could not load available CLI with Environment Deployment entry.", e);
+        }
+    }
+
+    public Configuration getFlinkConfig() {
+        return flinkConfig;
+    }
+
+    private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+        final Options customOptions = new Options();
+        for (CustomCommandLine customCommandLine : commandLines) {
+            customCommandLine.addGeneralOptions(customOptions);
+            customCommandLine.addRunOptions(customOptions);
+        }
+        return CliFrontendParser.mergeOptions(
+                CliFrontendParser.getRunCommandOptions(), customOptions);
+    }
+
+    private static Configuration createExecutionConfig(
+            CommandLine commandLine,
+            Options commandLineOptions,
+            List<CustomCommandLine> availableCommandLines,
+            List<URL> dependencies)
+            throws FlinkException {
+        LOG.debug("Available commandline options: {}", commandLineOptions);
+        List<String> options =
+                Stream.of(commandLine.getOptions())
+                        .map(o -> o.getOpt() + "=" + o.getValue())
+                        .collect(Collectors.toList());
+        LOG.debug(
+                "Instantiated commandline args: {}, options: {}",
+                commandLine.getArgList(),
+                options);
+
+        final CustomCommandLine activeCommandLine =
+                findActiveCommandLine(availableCommandLines, commandLine);
+        LOG.debug(
+                "Available commandlines: {}, active commandline: {}",
+                availableCommandLines,
+                activeCommandLine);
+
+        Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
+
+        try {
+            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
+            final ExecutionConfigAccessor executionConfigAccessor =
+                    ExecutionConfigAccessor.fromProgramOptions(programOptions, dependencies);
+            executionConfigAccessor.applyToConfiguration(executionConfig);
+        } catch (CliArgsException e) {
+            throw new SqlGatewayException("Invalid deployment run options.", e);
+        }
+
+        LOG.info("Execution config: {}", executionConfig);
+        return executionConfig;
+    }
+
+    private static CustomCommandLine findActiveCommandLine(
+            List<CustomCommandLine> availableCommandLines, CommandLine commandLine) {
+        for (CustomCommandLine cli : availableCommandLines) {
+            if (cli.isActive(commandLine)) {
+                return cli;
+            }
+        }
+        throw new SqlGatewayException("Could not find a matching deployment.");
+    }
+
+    // -------------------------------------------------------------------------------------------
+
+    public static DefaultContext load(Map<String, String> config) {
+        // 1. find the configuration directory
+        String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
+        // 2. load the global configuration
+        Configuration configuration = GlobalConfiguration.loadConfiguration(flinkConfigDir);
+        configuration.addAll(Configuration.fromMap(config));
+
+        // 3. load the custom command lines
+        List<CustomCommandLine> commandLines =
+                CliFrontend.loadCustomCommandLines(configuration, flinkConfigDir);
+
+        return new DefaultContext(configuration, commandLines);
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
new file mode 100644
index 00000000000..83bc5758ce0
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.PlannerFactoryUtil;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.module.ModuleManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Context describing a session, it's mainly used for user to open a new session in the backend. If
+ * client request to open a new session, the backend {@code Executor} will maintain the session
+ * context map util users close it.
+ */
+public class SessionContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class);
+
+    private final SessionHandle sessionId;
+    private final EndpointVersion endpointVersion;
+
+    // store all options and use Configuration to build SessionState and TableConfig.
+    private final Configuration sessionConf;
+    private final SessionState sessionState;
+    private final URLClassLoader userClassloader;
+
+    private SessionContext(
+            SessionHandle sessionId,
+            EndpointVersion endpointVersion,
+            Configuration sessionConf,
+            URLClassLoader classLoader,
+            SessionState sessionState) {
+        this.sessionId = sessionId;
+        this.endpointVersion = endpointVersion;
+        this.sessionConf = sessionConf;
+        this.userClassloader = classLoader;
+        this.sessionState = sessionState;
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Getter method
+    // --------------------------------------------------------------------------------------------
+
+    public SessionHandle getSessionId() {
+        return this.sessionId;
+    }
+
+    public Map<String, String> getConfigMap() {
+        return sessionConf.toMap();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Method to execute commands
+    // --------------------------------------------------------------------------------------------
+
+    /** Close resources, e.g. catalogs. */
+    public void close() {
+        for (String name : sessionState.catalogManager.listCatalogs()) {
+            sessionState.catalogManager.unregisterCatalog(name, true);
+        }
+        try {
+            userClassloader.close();
+        } catch (IOException e) {
+            LOG.debug("Error while closing class loader.", e);
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    public static SessionContext create(
+            DefaultContext defaultContext,
+            SessionHandle sessionId,
+            EndpointVersion endpointVersion,
+            Configuration sessionConf) {
+        // --------------------------------------------------------------------------------------------------------------
+        // Init config
+        // --------------------------------------------------------------------------------------------------------------
+
+        Configuration configuration = defaultContext.getFlinkConfig().clone();
+        configuration.addAll(sessionConf);
+
+        // --------------------------------------------------------------------------------------------------------------
+        // Init classloader
+        // --------------------------------------------------------------------------------------------------------------
+
+        URLClassLoader classLoader =
+                buildClassLoader(Collections.emptySet(), Collections.emptySet(), configuration);
+
+        // --------------------------------------------------------------------------------------------------------------
+        // Init session state
+        // --------------------------------------------------------------------------------------------------------------
+
+        ModuleManager moduleManager = new ModuleManager();
+
+        final EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(configuration);
+
+        CatalogManager catalogManager =
+                CatalogManager.newBuilder()
+                        // Currently, the classloader is only used by DataTypeFactory.
+                        .classLoader(classLoader)
+                        .config(configuration)
+                        .defaultCatalog(
+                                settings.getBuiltInCatalogName(),
+                                new GenericInMemoryCatalog(
+                                        settings.getBuiltInCatalogName(),
+                                        settings.getBuiltInDatabaseName()))
+                        .build();
+
+        FunctionCatalog functionCatalog =
+                new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
+        SessionState sessionState =
+                new SessionState(catalogManager, moduleManager, functionCatalog);
+
+        return new SessionContext(
+                sessionId, endpointVersion, configuration, classLoader, sessionState);
+    }
+
+    private static URLClassLoader buildClassLoader(
+            Set<URL> envDependencies, Set<URL> userDependencies, Configuration conf) {
+        Set<URL> newDependencies = new HashSet<>();
+        newDependencies.addAll(envDependencies);
+        newDependencies.addAll(userDependencies);
+
+        // override to use SafetyNetWrapperClassLoader
+        conf.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, true);
+
+        return ClientUtils.buildUserCodeClassLoader(
+                new ArrayList<>(newDependencies),
+                Collections.emptyList(),
+                SessionContext.class.getClassLoader(),
+                conf);
+    }
+
+    // ------------------------------------------------------------------------------------------------------------------
+    // Helpers
+    // ------------------------------------------------------------------------------------------------------------------
+
+    public TableEnvironmentInternal createTableEnvironment() {
+        // checks the value of RUNTIME_MODE
+        final EnvironmentSettings settings =
+                EnvironmentSettings.newInstance().withConfiguration(sessionConf).build();
+
+        TableConfig tableConfig = new TableConfig();
+        tableConfig.addConfiguration(sessionConf);
+
+        StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();
+
+        final Executor executor = lookupExecutor(streamExecEnv);
+        return createStreamTableEnvironment(
+                streamExecEnv,
+                settings,
+                tableConfig,
+                executor,
+                sessionState.catalogManager,
+                sessionState.moduleManager,
+                sessionState.functionCatalog,
+                userClassloader);
+    }
+
+    private TableEnvironmentInternal createStreamTableEnvironment(
+            StreamExecutionEnvironment env,
+            EnvironmentSettings settings,
+            TableConfig tableConfig,
+            Executor executor,
+            CatalogManager catalogManager,
+            ModuleManager moduleManager,
+            FunctionCatalog functionCatalog,
+            ClassLoader userClassLoader) {
+
+        final Planner planner =
+                PlannerFactoryUtil.createPlanner(
+                        executor,
+                        tableConfig,
+                        userClassLoader,
+                        moduleManager,
+                        catalogManager,
+                        functionCatalog);
+
+        return new StreamTableEnvironmentImpl(
+                catalogManager,
+                moduleManager,
+                functionCatalog,
+                tableConfig,
+                env,
+                planner,
+                executor,
+                settings.isStreamingMode(),
+                userClassLoader);
+    }
+
+    private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+        try {
+            final ExecutorFactory executorFactory =
+                    FactoryUtil.discoverFactory(
+                            userClassloader,
+                            ExecutorFactory.class,
+                            ExecutorFactory.DEFAULT_IDENTIFIER);
+            final Method createMethod =
+                    executorFactory
+                            .getClass()
+                            .getMethod("create", StreamExecutionEnvironment.class);
+
+            return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
+        } catch (Exception e) {
+            throw new TableException(
+                    "Could not instantiate the executor. Make sure a planner module is on the classpath",
+                    e);
+        }
+    }
+
+    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+        // We need not different StreamExecutionEnvironments to build and submit flink job,
+        // instead we just use StreamExecutionEnvironment#executeAsync(StreamGraph) method
+        // to execute existing StreamGraph.
+        // This requires StreamExecutionEnvironment to have a full flink configuration.
+        return new StreamExecutionEnvironment(new Configuration(sessionConf), userClassloader);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Inner class
+    // --------------------------------------------------------------------------------------------
+
+    /** session state. */
+    public static class SessionState {
+
+        public final CatalogManager catalogManager;
+        public final FunctionCatalog functionCatalog;
+        public final ModuleManager moduleManager;
+
+        public SessionState(
+                CatalogManager catalogManager,
+                ModuleManager moduleManager,
+                FunctionCatalog functionCatalog) {
+            this.catalogManager = catalogManager;
+            this.moduleManager = moduleManager;
+            this.functionCatalog = functionCatalog;
+        }
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
new file mode 100644
index 00000000000..28fc1eb81cf
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * Similar to HTTP Session, which could maintain user identity and store user-specific data during
+ * multiple request/response interactions between a client and the gateway server.
+ */
+public class Session implements Closeable {
+
+    private final SessionContext sessionContext;
+    private volatile long lastAccessTime;
+
+    public Session(SessionContext sessionContext) {
+        this.sessionContext = sessionContext;
+    }
+
+    public void touch() {
+        this.lastAccessTime = System.currentTimeMillis();
+    }
+
+    public long getLastAccessTime() {
+        return lastAccessTime;
+    }
+
+    public SessionHandle getSessionHandle() {
+        return sessionContext.getSessionId();
+    }
+
+    public Map<String, String> getSessionConfig() {
+        return sessionContext.getConfigMap();
+    }
+
+    @Override
+    public void close() {
+        sessionContext.close();
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
new file mode 100644
index 00000000000..7d1457bf3d9
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.utils.ThreadUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/** Manage the lifecycle of the {@code Session}. */
+public class SessionManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class);
+    private static final String OPERATION_POOL_NAME = "sql-gateway-operation-pool";
+
+    private final DefaultContext defaultContext;
+
+    private final long idleTimeout;
+    private final long checkInterval;
+    private final int maxSessionCount;
+
+    private final Map<SessionHandle, Session> sessions;
+
+    private ExecutorService operationExecutorService;
+    private ScheduledExecutorService scheduledExecutorService;
+    private ScheduledFuture<?> timeoutCheckerFuture;
+
+    public SessionManager(DefaultContext defaultContext) {
+        this.defaultContext = defaultContext;
+        ReadableConfig conf = defaultContext.getFlinkConfig();
+        this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis();
+        this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis();
+        this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM);
+        this.sessions = new ConcurrentHashMap<>();
+    }
+
+    public void start() {
+        if (checkInterval > 0 && idleTimeout > 0) {
+            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+            timeoutCheckerFuture =
+                    scheduledExecutorService.scheduleAtFixedRate(
+                            () -> {
+                                LOG.debug(
+                                        "Start to cleanup expired sessions, current session count: {}",
+                                        sessions.size());
+                                for (Map.Entry<SessionHandle, Session> entry :
+                                        sessions.entrySet()) {
+                                    SessionHandle sessionId = entry.getKey();
+                                    Session session = entry.getValue();
+                                    if (isSessionExpired(session)) {
+                                        LOG.info("Session {} is expired, closing it...", sessionId);
+                                        closeSession(session);
+                                    }
+                                }
+                                LOG.debug(
+                                        "Removing expired session finished, current session count: {}",
+                                        sessions.size());
+                            },
+                            checkInterval,
+                            checkInterval,
+                            TimeUnit.MILLISECONDS);
+        }
+
+        ReadableConfig conf = defaultContext.getFlinkConfig();
+        operationExecutorService =
+                ThreadUtils.newThreadPool(
+                        conf.get(SQL_GATEWAY_WORKER_THREADS_MIN),
+                        conf.get(SQL_GATEWAY_WORKER_THREADS_MAX),
+                        conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(),
+                        OPERATION_POOL_NAME);
+    }
+
+    public void stop() {
+        if (scheduledExecutorService != null) {
+            timeoutCheckerFuture.cancel(true);
+            scheduledExecutorService.shutdown();
+        }
+        if (operationExecutorService != null) {
+            operationExecutorService.shutdown();
+        }
+        LOG.info("SessionManager is stopped.");
+    }
+
+    public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException {
+        Session session = sessions.get(sessionHandle);
+        if (session == null) {
+            String msg = String.format("Session '%s' does not exist.", sessionHandle);
+            LOG.warn(msg);
+            throw new SqlGatewayException(msg);
+        }
+        session.touch();
+        return session;
+    }
+
+    /**
+     * Register the session into the {@link SessionManager}.
+     *
+     * <p>Use synchronized to keep the checkSessionCount and build the Session are atomic.
+     */
+    public synchronized Session openSession(SessionEnvironment environment)
+            throws SqlGatewayException {
+        // check session limit
+        checkSessionCount();
+
+        Session session = null;
+        SessionHandle sessionId = null;
+        do {
+            sessionId = SessionHandle.create();
+        } while (sessions.containsKey(sessionId));
+
+        SessionContext sessionContext =
+                SessionContext.create(
+                        defaultContext,
+                        sessionId,
+                        environment.getSessionEndpointVersion(),
+                        Configuration.fromMap(environment.getSessionConfig()));
+        session = new Session(sessionContext);
+        sessions.put(sessionId, session);
+
+        LOG.info(
+                "Session {} is opened, and the number of current sessions is {}.",
+                session.getSessionHandle(),
+                sessions.size());
+
+        return session;
+    }
+
+    public void closeSession(SessionHandle sessionId) throws SqlGatewayException {
+        Session session = getSession(sessionId);
+        closeSession(session);
+    }
+
+    // ------------------------------------------------------------------------------------------
+    // Utilities
+    // ------------------------------------------------------------------------------------------
+
+    private void checkSessionCount() throws SqlGatewayException {
+        if (maxSessionCount <= 0) {
+            return;
+        }
+        if (sessions.size() >= maxSessionCount) {
+            String msg =
+                    String.format(
+                            "Failed to create session, the count of active sessions exceeds the max count: %s",
+                            maxSessionCount);
+            LOG.warn(msg);
+            throw new SqlGatewayException(msg);
+        }
+    }
+
+    private boolean isSessionExpired(Session session) {
+        if (idleTimeout > 0) {
+            return (System.currentTimeMillis() - session.getLastAccessTime()) > idleTimeout;
+        } else {
+            return false;
+        }
+    }
+
+    private void closeSession(Session session) {
+        SessionHandle sessionId = session.getSessionHandle();
+        sessions.remove(sessionId);
+        session.close();
+        LOG.info("Session: {} is closed.", sessionId);
+    }
+
+    @VisibleForTesting
+    boolean isSessionAlive(SessionHandle sessionId) {
+        return sessions.containsKey(sessionId);
+    }
+
+    @VisibleForTesting
+    int currentSessionCount() {
+        return sessions.size();
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlExecutionException.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlExecutionException.java
new file mode 100644
index 00000000000..c13c48622d9
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlExecutionException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/** Exception thrown during the execution. */
+public class SqlExecutionException extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SqlExecutionException(String message) {
+        super(message);
+    }
+
+    public SqlExecutionException(String message, Throwable e) {
+        super(message, e);
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/ThreadUtils.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/ThreadUtils.java
new file mode 100644
index 00000000000..7483f870e9f
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/ThreadUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** Utils for thread pool executor. */
+public class ThreadUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadUtils.class);
+
+    public static ThreadPoolExecutor newThreadPool(
+            int poolSize, int poolQueueSize, long keepAliveMs, String threadPoolName) {
+        LOG.info(
+                "Created thread pool {} with core size {}, max size {} and keep alive time {}ms.",
+                threadPoolName,
+                poolSize,
+                poolQueueSize,
+                keepAliveMs);
+
+        return new ThreadPoolExecutor(
+                poolSize,
+                poolQueueSize,
+                keepAliveMs,
+                TimeUnit.MILLISECONDS,
+                new SynchronousQueue<>());
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
new file mode 100644
index 00000000000..048af7401c8
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** ITCase for {@link SqlGatewayServiceImpl}. */
+public class SqlGatewayServiceITCase extends AbstractTestBase {
+
+    @RegisterExtension
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension();
+
+    private static SqlGatewayServiceImpl service;
+
+    @BeforeAll
+    public static void setUp() {
+        service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService();
+    }
+
+    @Test
+    public void testOpenSessionWithConfig() {
+        Map<String, String> options = new HashMap<>();
+        options.put("key1", "val1");
+        options.put("key2", "val2");
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(options)
+                        .build();
+
+        SessionHandle sessionHandle = service.openSession(environment);
+        Map<String, String> actualConfig = service.getSessionConfig(sessionHandle);
+
+        options.forEach(
+                (k, v) ->
+                        assertThat(
+                                String.format(
+                                        "Should contains (%s, %s) in the actual config.", k, v),
+                                actualConfig,
+                                Matchers.hasEntry(k, v)));
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java
new file mode 100644
index 00000000000..ae4ff66cd81
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link SessionManager}. */
+public class SessionManagerTest {
+
+    private SessionManager sessionManager;
+
+    @BeforeEach
+    public void setup() {
+        Configuration conf = new Configuration();
+        conf.set(
+                SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT,
+                Duration.ofSeconds(2));
+        conf.set(
+                SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL,
+                Duration.ofMillis(100));
+        conf.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 3);
+        sessionManager =
+                new SessionManager(
+                        new DefaultContext(conf, Collections.singletonList(new DefaultCLI())));
+        sessionManager.start();
+    }
+
+    @AfterEach
+    public void cleanUp() {
+        if (sessionManager != null) {
+            sessionManager.stop();
+        }
+    }
+
+    @Test
+    public void testIdleSessionCleanup() throws Exception {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .build();
+        Session session = sessionManager.openSession(environment);
+        SessionHandle sessionId = session.getSessionHandle();
+        for (int i = 0; i < 3; i++) {
+            // should success
+            sessionManager.getSession(sessionId);
+            Thread.sleep(1000);
+        }
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+        while (deadline.hasTimeLeft() && sessionManager.isSessionAlive(sessionId)) {
+            //noinspection BusyWait
+            Thread.sleep(1000);
+        }
+        assertFalse(sessionManager.isSessionAlive(sessionId));
+    }
+
+    @Test
+    public void testSessionNumberLimit() {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .build();
+
+        sessionManager.openSession(environment);
+        sessionManager.openSession(environment);
+        sessionManager.openSession(environment);
+
+        assertEquals(3, sessionManager.currentSessionCount());
+        assertThrows(
+                SqlGatewayException.class,
+                () -> sessionManager.openSession(environment),
+                "Failed to create session, the count of active sessions exceeds the max count: 3");
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
new file mode 100644
index 00000000000..0e5fbcec5ba
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+
+/** A simple {@link Extension} to be used by tests that require a {@link SqlGatewayService}. */
+public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCallback {
+
+    private SqlGatewayService service;
+    private SessionManager sessionManager;
+    private TemporaryFolder temporaryFolder;
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        final Map<String, String> originalEnv = System.getenv();
+        try {
+            // prepare conf dir
+            temporaryFolder = new TemporaryFolder();
+            temporaryFolder.create();
+            File confFolder = temporaryFolder.newFolder("conf");
+            File confYaml = new File(confFolder, "flink-conf.yaml");
+            if (!confYaml.createNewFile()) {
+                throw new IOException("Can't create testing flink-conf.yaml file.");
+            }
+
+            // adjust the test environment for the purposes of this test
+            Map<String, String> map = new HashMap<>(System.getenv());
+            map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
+            CommonTestUtils.setEnv(map);
+
+            sessionManager = new SessionManager(DefaultContext.load(new HashMap<>()));
+        } finally {
+            CommonTestUtils.setEnv(originalEnv);
+        }
+
+        service = new SqlGatewayServiceImpl(sessionManager);
+        sessionManager.start();
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        if (sessionManager != null) {
+            sessionManager.stop();
+        }
+        temporaryFolder.delete();
+    }
+
+    public SqlGatewayService getService() {
+        return service;
+    }
+
+    public SessionManager getSessionManager() {
+        return sessionManager;
+    }
+}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index ef391d5b7b3..2ce69f66718 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -43,6 +43,8 @@ under the License.
 		<module>flink-table-planner-loader</module>
 		<module>flink-table-planner-loader-bundle</module>
 		<module>flink-table-runtime</module>
+		<module>flink-sql-gateway-api</module>
+		<module>flink-sql-gateway</module>
 		<module>flink-sql-client</module>
 		<module>flink-sql-parser</module>
 		<module>flink-sql-parser-hive</module>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index e5c99251531..3ac0d3607eb 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -79,6 +79,8 @@ flink-table/flink-table-api-java-bridge,\
 flink-table/flink-table-api-scala-bridge,\
 flink-table/flink-table-api-java-uber,\
 flink-table/flink-sql-client,\
+flink-table/flink-sql-gateway-api,\
+flink-table/flink-sql-gateway,\
 flink-table/flink-table-planner,\
 flink-table/flink-table-planner-loader,\
 flink-table/flink-table-planner-loader-bundle,\