You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/06/23 01:37:30 UTC
[flink] 01/02: [FLINK-27766][sql-gateway] Introduce the basic components for the SqlGatewayService
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit da08267c2cb29d98e626a867f8e07cb5b8e7f29a
Author: Shengkai <10...@qq.com>
AuthorDate: Wed Jun 22 19:43:54 2022 +0800
[FLINK-27766][sql-gateway] Introduce the basic components for the SqlGatewayService
---
flink-table/flink-sql-gateway-api/pom.xml | 67 +++++
.../flink/table/gateway/api/HandleIdentifier.java | 63 +++++
.../flink/table/gateway/api/SqlGatewayService.java | 58 +++++
.../api/config/SqlGatewayServiceConfigOptions.java | 76 ++++++
.../gateway/api/endpoint/EndpointVersion.java | 25 ++
.../gateway/api/session/SessionEnvironment.java | 122 +++++++++
.../table/gateway/api/session/SessionHandle.java | 66 +++++
.../gateway/api/utils/SqlGatewayException.java | 39 +++
.../api/session/SessionEnvironmentTest.java | 52 ++++
.../gateway/api/utils/MockedEndpointVersion.java | 26 ++
flink-table/flink-sql-gateway/pom.xml | 103 ++++++++
.../gateway/service/SqlGatewayServiceImpl.java | 80 ++++++
.../gateway/service/context/DefaultContext.java | 152 +++++++++++
.../gateway/service/context/SessionContext.java | 286 +++++++++++++++++++++
.../table/gateway/service/session/Session.java | 60 +++++
.../gateway/service/session/SessionManager.java | 214 +++++++++++++++
.../service/utils/SqlExecutionException.java | 33 +++
.../table/gateway/service/utils/ThreadUtils.java | 49 ++++
.../gateway/service/SqlGatewayServiceITCase.java | 73 ++++++
.../service/session/SessionManagerTest.java | 108 ++++++++
.../service/utils/SqlGatewayServiceExtension.java | 89 +++++++
flink-table/pom.xml | 2 +
tools/ci/stage.sh | 2 +
23 files changed, 1845 insertions(+)
diff --git a/flink-table/flink-sql-gateway-api/pom.xml b/flink-table/flink-sql-gateway-api/pom.xml
new file mode 100644
index 00000000000..e0639d215e8
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table</artifactId>
+ <version>1.16-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-sql-gateway-api</artifactId>
+ <name>Flink : Table : SQL Gateway : API</name>
+ <description>
+ This module contains extension points of the SQL Gateway API.
+ It allows for implementing user-defined endpoint with
+ minimal dependencies.
+ </description>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- Table ecosystem -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Make test classes available to other modules. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java
new file mode 100644
index 00000000000..9b9acbfb72c
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Identifiers for Handle. */
+@PublicEvolving
+public class HandleIdentifier {
+
+ private final UUID publicId;
+ private final UUID secretId;
+
+ public HandleIdentifier(UUID publicId, UUID secretId) {
+ this.publicId = publicId;
+ this.secretId = secretId;
+ }
+
+ public UUID getPublicId() {
+ return publicId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof HandleIdentifier)) {
+ return false;
+ }
+ HandleIdentifier that = (HandleIdentifier) o;
+ return Objects.equals(publicId, that.publicId) && Objects.equals(secretId, that.secretId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(publicId, secretId);
+ }
+
+ @Override
+ public String toString() {
+ return publicId.toString();
+ }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
new file mode 100644
index 00000000000..933c797f243
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+
+import java.util.Map;
+
+/** A service of SQL gateway is responsible for handling requests from the endpoints. */
+@PublicEvolving
+public interface SqlGatewayService {
+
+ // -------------------------------------------------------------------------------------------
+ // Session Management
+ // -------------------------------------------------------------------------------------------
+
+ /**
+ * Open the {@code Session}.
+ *
+ * @param environment Environment to initialize the Session.
+ * @return Returns a handle that used to identify the Session.
+ */
+ SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
+
+ /**
+ * Close the {@code Session}.
+ *
+ * @param sessionHandle handle to identify the Session needs to be closed.
+ */
+ void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
+
+ /**
+ * Get the current configuration of the {@code Session}.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @return Returns configuration of the session.
+ */
+ Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java
new file mode 100644
index 00000000000..eadfb85a7f1
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options of the {@link SqlGatewayService}. */
+@PublicEvolving
+public class SqlGatewayServiceConfigOptions {
+
+ public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
+ key("sql-gateway.session.idle-timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription(
+ "Timeout interval for closing the session when the session hasn't been accessed during the interval. "
+ + "If setting to zero or negative value, the session will not be closed.");
+
+ public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL =
+ key("sql-gateway.session.check-interval")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDescription(
+ "The check interval for idle session timeout, which can be disabled by setting to zero or negative value.");
+
+ public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM =
+ key("sql-gateway.session.max-num")
+ .intType()
+ .defaultValue(1000000)
+ .withDescription(
+ "The maximum number of the active session for sql gateway service.");
+
+ public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX =
+ key("sql-gateway.worker.threads.max")
+ .intType()
+ .defaultValue(500)
+ .withDescription(
+ "The maximum number of worker threads for sql gateway service.");
+
+ public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MIN =
+ key("sql-gateway.worker.threads.min")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "The minimum number of worker threads for sql gateway service.");
+
+ public static final ConfigOption<Duration> SQL_GATEWAY_WORKER_KEEPALIVE_TIME =
+ key("sql-gateway.worker.keepalive-time")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(5))
+ .withDescription(
+ "Keepalive time for an idle worker thread. When the number of workers exceeds min workers, "
+ + "excessive threads are killed after this time interval.");
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/endpoint/EndpointVersion.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/endpoint/EndpointVersion.java
new file mode 100644
index 00000000000..7aa3c586c5b
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/endpoint/EndpointVersion.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.endpoint;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Version of the endpoint. */
+@PublicEvolving
+public interface EndpointVersion {}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
new file mode 100644
index 00000000000..75f573aae2b
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Environment to initialize the {@code Session}. */
+@PublicEvolving
+public class SessionEnvironment {
+ private final @Nullable String sessionName;
+ private final EndpointVersion version;
+ private final Map<String, String> sessionConfig;
+
+ @VisibleForTesting
+ SessionEnvironment(
+ @Nullable String sessionName,
+ EndpointVersion version,
+ Map<String, String> sessionConfig) {
+ this.sessionName = sessionName;
+ this.version = version;
+ this.sessionConfig = sessionConfig;
+ }
+
+ // -------------------------------------------------------------------------------------------
+ // Getter
+ // -------------------------------------------------------------------------------------------
+
+ public Optional<String> getSessionName() {
+ return Optional.ofNullable(sessionName);
+ }
+
+ public EndpointVersion getSessionEndpointVersion() {
+ return version;
+ }
+
+ public Map<String, String> getSessionConfig() {
+ return Collections.unmodifiableMap(sessionConfig);
+ }
+
+ // -------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SessionEnvironment)) {
+ return false;
+ }
+ SessionEnvironment that = (SessionEnvironment) o;
+ return Objects.equals(sessionName, that.sessionName)
+ && Objects.equals(version, that.version)
+ && Objects.equals(sessionConfig, that.sessionConfig);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sessionName, version, sessionConfig);
+ }
+
+ // -------------------------------------------------------------------------------------------
+ // Builder
+ // -------------------------------------------------------------------------------------------
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /** Builder to build the {@link SessionEnvironment}. */
+ public static class Builder {
+ private @Nullable String sessionName;
+ private EndpointVersion version;
+ private final Map<String, String> sessionConfig = new HashMap<>();
+
+ public Builder setSessionName(String sessionName) {
+ this.sessionName = sessionName;
+ return this;
+ }
+
+ public Builder setSessionEndpointVersion(EndpointVersion version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder addSessionConfig(Map<String, String> sessionConfig) {
+ this.sessionConfig.putAll(sessionConfig);
+ return this;
+ }
+
+ public SessionEnvironment build() {
+ return new SessionEnvironment(sessionName, checkNotNull(version), sessionConfig);
+ }
+ }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
new file mode 100644
index 00000000000..4a5a2623044
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.HandleIdentifier;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Session Handle that used to identify the Session. */
+@PublicEvolving
+public class SessionHandle {
+
+ private final HandleIdentifier identifier;
+
+ public static SessionHandle create() {
+ return new SessionHandle(new HandleIdentifier(UUID.randomUUID(), UUID.randomUUID()));
+ }
+
+ public SessionHandle(HandleIdentifier identifier) {
+ this.identifier = identifier;
+ }
+
+ public HandleIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SessionHandle)) {
+ return false;
+ }
+ SessionHandle that = (SessionHandle) o;
+ return Objects.equals(identifier, that.identifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier);
+ }
+
+ @Override
+ public String toString() {
+ return identifier.toString();
+ }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/SqlGatewayException.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/SqlGatewayException.java
new file mode 100644
index 00000000000..20fc43417b7
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/SqlGatewayException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** General exception for SQL gateway related errors. */
+@PublicEvolving
+public class SqlGatewayException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public SqlGatewayException(String message) {
+ super(message);
+ }
+
+ public SqlGatewayException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ public SqlGatewayException(Throwable e) {
+ super(e);
+ }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
new file mode 100644
index 00000000000..fe472a885ed
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.session;
+
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test {@link SessionEnvironment}. */
+public class SessionEnvironmentTest {
+
+ @Test
+ public void testBuildSessionEnvironment() {
+ String sessionName = "test";
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("key1", "value1");
+ configMap.put("key2", "value2");
+
+ SessionEnvironment expectedEnvironment =
+ new SessionEnvironment(sessionName, MockedEndpointVersion.V1, configMap);
+
+ SessionEnvironment actualEnvironment =
+ SessionEnvironment.newBuilder()
+ .setSessionName(sessionName)
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .addSessionConfig(configMap)
+ .build();
+ assertEquals(expectedEnvironment, actualEnvironment);
+ }
+}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedEndpointVersion.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedEndpointVersion.java
new file mode 100644
index 00000000000..d01c079e0d5
--- /dev/null
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedEndpointVersion.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.utils;
+
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+
+/** Mocked {@link EndpointVersion} for test. */
+public enum MockedEndpointVersion implements EndpointVersion {
+ V1;
+}
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
new file mode 100644
index 00000000000..9c217e71e17
--- /dev/null
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.16-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-sql-gateway</artifactId>
+ <name>Flink : Table : SQL Gateway</name>
+
+ <dependencies>
+ <!-- Flink client to submit jobs -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Table ecosystem -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-gateway-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-gateway-api</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Build flink-sql-gateway jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <!-- Exclude all flink-dist files and only contain sql-gateway files -->
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <include>org.apache.flink:flink-sql-gateway-api</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
new file mode 100644
index 00000000000..2606f8f7763
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.session.Session;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** The implementation of the {@link SqlGatewayService} interface. */
+public class SqlGatewayServiceImpl implements SqlGatewayService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SqlGatewayServiceImpl.class);
+
+ private final SessionManager sessionManager;
+
+ public SqlGatewayServiceImpl(SessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
+ @Override
+ public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException {
+ try {
+ return sessionManager.openSession(environment).getSessionHandle();
+ } catch (Throwable e) {
+ LOG.error("Failed to openSession.", e);
+ throw new SqlGatewayException("Failed to openSession.", e);
+ }
+ }
+
+ @Override
+ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException {
+ try {
+ sessionManager.closeSession(sessionHandle);
+ } catch (Throwable e) {
+ LOG.error("Failed to closeSession.", e);
+ throw new SqlGatewayException("Failed to closeSession.", e);
+ }
+ }
+
+ @Override
+ public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
+ throws SqlGatewayException {
+ try {
+ return getSession(sessionHandle).getSessionConfig();
+ } catch (Throwable e) {
+ LOG.error("Failed to getSessionConfig.", e);
+ throw new SqlGatewayException("Failed to getSessionConfig.", e);
+ }
+ }
+
+ @VisibleForTesting
+ Session getSession(SessionHandle sessionHandle) {
+ return sessionManager.getSession(sessionHandle);
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
new file mode 100644
index 00000000000..1ac63d8bedd
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The context memorized initial configuration. */
+public class DefaultContext {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class);
+
+ private final Configuration flinkConfig;
+
+ public DefaultContext(Configuration flinkConfig, List<CustomCommandLine> commandLines) {
+ this.flinkConfig = flinkConfig;
+ // initialize default file system
+ FileSystem.initialize(
+ flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+ Options commandLineOptions = collectCommandLineOptions(commandLines);
+ try {
+ CommandLine deploymentCommandLine =
+ CliFrontendParser.parse(commandLineOptions, new String[] {}, true);
+ flinkConfig.addAll(
+ createExecutionConfig(
+ deploymentCommandLine,
+ commandLineOptions,
+ commandLines,
+ Collections.emptyList()));
+ } catch (Exception e) {
+ throw new SqlGatewayException(
+ "Could not load available CLI with Environment Deployment entry.", e);
+ }
+ }
+
+ public Configuration getFlinkConfig() {
+ return flinkConfig;
+ }
+
+ private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+ final Options customOptions = new Options();
+ for (CustomCommandLine customCommandLine : commandLines) {
+ customCommandLine.addGeneralOptions(customOptions);
+ customCommandLine.addRunOptions(customOptions);
+ }
+ return CliFrontendParser.mergeOptions(
+ CliFrontendParser.getRunCommandOptions(), customOptions);
+ }
+
+ private static Configuration createExecutionConfig(
+ CommandLine commandLine,
+ Options commandLineOptions,
+ List<CustomCommandLine> availableCommandLines,
+ List<URL> dependencies)
+ throws FlinkException {
+ LOG.debug("Available commandline options: {}", commandLineOptions);
+ List<String> options =
+ Stream.of(commandLine.getOptions())
+ .map(o -> o.getOpt() + "=" + o.getValue())
+ .collect(Collectors.toList());
+ LOG.debug(
+ "Instantiated commandline args: {}, options: {}",
+ commandLine.getArgList(),
+ options);
+
+ final CustomCommandLine activeCommandLine =
+ findActiveCommandLine(availableCommandLines, commandLine);
+ LOG.debug(
+ "Available commandlines: {}, active commandline: {}",
+ availableCommandLines,
+ activeCommandLine);
+
+ Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
+
+ try {
+ final ProgramOptions programOptions = ProgramOptions.create(commandLine);
+ final ExecutionConfigAccessor executionConfigAccessor =
+ ExecutionConfigAccessor.fromProgramOptions(programOptions, dependencies);
+ executionConfigAccessor.applyToConfiguration(executionConfig);
+ } catch (CliArgsException e) {
+ throw new SqlGatewayException("Invalid deployment run options.", e);
+ }
+
+ LOG.info("Execution config: {}", executionConfig);
+ return executionConfig;
+ }
+
+ private static CustomCommandLine findActiveCommandLine(
+ List<CustomCommandLine> availableCommandLines, CommandLine commandLine) {
+ for (CustomCommandLine cli : availableCommandLines) {
+ if (cli.isActive(commandLine)) {
+ return cli;
+ }
+ }
+ throw new SqlGatewayException("Could not find a matching deployment.");
+ }
+
+ // -------------------------------------------------------------------------------------------
+
+ public static DefaultContext load(Map<String, String> config) {
+ // 1. find the configuration directory
+ String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
+ // 2. load the global configuration
+ Configuration configuration = GlobalConfiguration.loadConfiguration(flinkConfigDir);
+ configuration.addAll(Configuration.fromMap(config));
+
+ // 3. load the custom command lines
+ List<CustomCommandLine> commandLines =
+ CliFrontend.loadCustomCommandLines(configuration, flinkConfigDir);
+
+ return new DefaultContext(configuration, commandLines);
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
new file mode 100644
index 00000000000..83bc5758ce0
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.PlannerFactoryUtil;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.module.ModuleManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Context describing a session, it's mainly used for user to open a new session in the backend. If
+ * client request to open a new session, the backend {@code Executor} will maintain the session
+ * context map util users close it.
+ */
+public class SessionContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class);
+
+ private final SessionHandle sessionId;
+ private final EndpointVersion endpointVersion;
+
+ // store all options and use Configuration to build SessionState and TableConfig.
+ private final Configuration sessionConf;
+ private final SessionState sessionState;
+ private final URLClassLoader userClassloader;
+
+ private SessionContext(
+ SessionHandle sessionId,
+ EndpointVersion endpointVersion,
+ Configuration sessionConf,
+ URLClassLoader classLoader,
+ SessionState sessionState) {
+ this.sessionId = sessionId;
+ this.endpointVersion = endpointVersion;
+ this.sessionConf = sessionConf;
+ this.userClassloader = classLoader;
+ this.sessionState = sessionState;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Getter method
+ // --------------------------------------------------------------------------------------------
+
+ public SessionHandle getSessionId() {
+ return this.sessionId;
+ }
+
+ public Map<String, String> getConfigMap() {
+ return sessionConf.toMap();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Method to execute commands
+ // --------------------------------------------------------------------------------------------
+
+ /** Close resources, e.g. catalogs. */
+ public void close() {
+ for (String name : sessionState.catalogManager.listCatalogs()) {
+ sessionState.catalogManager.unregisterCatalog(name, true);
+ }
+ try {
+ userClassloader.close();
+ } catch (IOException e) {
+ LOG.debug("Error while closing class loader.", e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ public static SessionContext create(
+ DefaultContext defaultContext,
+ SessionHandle sessionId,
+ EndpointVersion endpointVersion,
+ Configuration sessionConf) {
+ // --------------------------------------------------------------------------------------------------------------
+ // Init config
+ // --------------------------------------------------------------------------------------------------------------
+
+ Configuration configuration = defaultContext.getFlinkConfig().clone();
+ configuration.addAll(sessionConf);
+
+ // --------------------------------------------------------------------------------------------------------------
+ // Init classloader
+ // --------------------------------------------------------------------------------------------------------------
+
+ URLClassLoader classLoader =
+ buildClassLoader(Collections.emptySet(), Collections.emptySet(), configuration);
+
+ // --------------------------------------------------------------------------------------------------------------
+ // Init session state
+ // --------------------------------------------------------------------------------------------------------------
+
+ ModuleManager moduleManager = new ModuleManager();
+
+ final EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(configuration);
+
+ CatalogManager catalogManager =
+ CatalogManager.newBuilder()
+ // Currently, the classloader is only used by DataTypeFactory.
+ .classLoader(classLoader)
+ .config(configuration)
+ .defaultCatalog(
+ settings.getBuiltInCatalogName(),
+ new GenericInMemoryCatalog(
+ settings.getBuiltInCatalogName(),
+ settings.getBuiltInDatabaseName()))
+ .build();
+
+ FunctionCatalog functionCatalog =
+ new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
+ SessionState sessionState =
+ new SessionState(catalogManager, moduleManager, functionCatalog);
+
+ return new SessionContext(
+ sessionId, endpointVersion, configuration, classLoader, sessionState);
+ }
+
+ private static URLClassLoader buildClassLoader(
+ Set<URL> envDependencies, Set<URL> userDependencies, Configuration conf) {
+ Set<URL> newDependencies = new HashSet<>();
+ newDependencies.addAll(envDependencies);
+ newDependencies.addAll(userDependencies);
+
+ // override to use SafetyNetWrapperClassLoader
+ conf.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, true);
+
+ return ClientUtils.buildUserCodeClassLoader(
+ new ArrayList<>(newDependencies),
+ Collections.emptyList(),
+ SessionContext.class.getClassLoader(),
+ conf);
+ }
+
+ // ------------------------------------------------------------------------------------------------------------------
+ // Helpers
+ // ------------------------------------------------------------------------------------------------------------------
+
+ public TableEnvironmentInternal createTableEnvironment() {
+ // checks the value of RUNTIME_MODE
+ final EnvironmentSettings settings =
+ EnvironmentSettings.newInstance().withConfiguration(sessionConf).build();
+
+ TableConfig tableConfig = new TableConfig();
+ tableConfig.addConfiguration(sessionConf);
+
+ StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();
+
+ final Executor executor = lookupExecutor(streamExecEnv);
+ return createStreamTableEnvironment(
+ streamExecEnv,
+ settings,
+ tableConfig,
+ executor,
+ sessionState.catalogManager,
+ sessionState.moduleManager,
+ sessionState.functionCatalog,
+ userClassloader);
+ }
+
+ private TableEnvironmentInternal createStreamTableEnvironment(
+ StreamExecutionEnvironment env,
+ EnvironmentSettings settings,
+ TableConfig tableConfig,
+ Executor executor,
+ CatalogManager catalogManager,
+ ModuleManager moduleManager,
+ FunctionCatalog functionCatalog,
+ ClassLoader userClassLoader) {
+
+ final Planner planner =
+ PlannerFactoryUtil.createPlanner(
+ executor,
+ tableConfig,
+ userClassLoader,
+ moduleManager,
+ catalogManager,
+ functionCatalog);
+
+ return new StreamTableEnvironmentImpl(
+ catalogManager,
+ moduleManager,
+ functionCatalog,
+ tableConfig,
+ env,
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ userClassLoader);
+ }
+
+ private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+ try {
+ final ExecutorFactory executorFactory =
+ FactoryUtil.discoverFactory(
+ userClassloader,
+ ExecutorFactory.class,
+ ExecutorFactory.DEFAULT_IDENTIFIER);
+ final Method createMethod =
+ executorFactory
+ .getClass()
+ .getMethod("create", StreamExecutionEnvironment.class);
+
+ return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath",
+ e);
+ }
+ }
+
+ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+ // We need not different StreamExecutionEnvironments to build and submit flink job,
+ // instead we just use StreamExecutionEnvironment#executeAsync(StreamGraph) method
+ // to execute existing StreamGraph.
+ // This requires StreamExecutionEnvironment to have a full flink configuration.
+ return new StreamExecutionEnvironment(new Configuration(sessionConf), userClassloader);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Inner class
+ // --------------------------------------------------------------------------------------------
+
+ /** session state. */
+ public static class SessionState {
+
+ public final CatalogManager catalogManager;
+ public final FunctionCatalog functionCatalog;
+ public final ModuleManager moduleManager;
+
+ public SessionState(
+ CatalogManager catalogManager,
+ ModuleManager moduleManager,
+ FunctionCatalog functionCatalog) {
+ this.catalogManager = catalogManager;
+ this.moduleManager = moduleManager;
+ this.functionCatalog = functionCatalog;
+ }
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
new file mode 100644
index 00000000000..28fc1eb81cf
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * Similar to HTTP Session, which could maintain user identity and store user-specific data during
+ * multiple request/response interactions between a client and the gateway server.
+ */
+public class Session implements Closeable {
+
+ private final SessionContext sessionContext;
+ private volatile long lastAccessTime;
+
+ public Session(SessionContext sessionContext) {
+ this.sessionContext = sessionContext;
+ }
+
+ public void touch() {
+ this.lastAccessTime = System.currentTimeMillis();
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public SessionHandle getSessionHandle() {
+ return sessionContext.getSessionId();
+ }
+
+ public Map<String, String> getSessionConfig() {
+ return sessionContext.getConfigMap();
+ }
+
+ @Override
+ public void close() {
+ sessionContext.close();
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
new file mode 100644
index 00000000000..7d1457bf3d9
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.utils.ThreadUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/** Manage the lifecycle of the {@code Session}. */
+public class SessionManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class);
+ private static final String OPERATION_POOL_NAME = "sql-gateway-operation-pool";
+
+ private final DefaultContext defaultContext;
+
+ private final long idleTimeout;
+ private final long checkInterval;
+ private final int maxSessionCount;
+
+ private final Map<SessionHandle, Session> sessions;
+
+ private ExecutorService operationExecutorService;
+ private ScheduledExecutorService scheduledExecutorService;
+ private ScheduledFuture<?> timeoutCheckerFuture;
+
+ public SessionManager(DefaultContext defaultContext) {
+ this.defaultContext = defaultContext;
+ ReadableConfig conf = defaultContext.getFlinkConfig();
+ this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis();
+ this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis();
+ this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM);
+ this.sessions = new ConcurrentHashMap<>();
+ }
+
+ public void start() {
+ if (checkInterval > 0 && idleTimeout > 0) {
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ timeoutCheckerFuture =
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> {
+ LOG.debug(
+ "Start to cleanup expired sessions, current session count: {}",
+ sessions.size());
+ for (Map.Entry<SessionHandle, Session> entry :
+ sessions.entrySet()) {
+ SessionHandle sessionId = entry.getKey();
+ Session session = entry.getValue();
+ if (isSessionExpired(session)) {
+ LOG.info("Session {} is expired, closing it...", sessionId);
+ closeSession(session);
+ }
+ }
+ LOG.debug(
+ "Removing expired session finished, current session count: {}",
+ sessions.size());
+ },
+ checkInterval,
+ checkInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ ReadableConfig conf = defaultContext.getFlinkConfig();
+ operationExecutorService =
+ ThreadUtils.newThreadPool(
+ conf.get(SQL_GATEWAY_WORKER_THREADS_MIN),
+ conf.get(SQL_GATEWAY_WORKER_THREADS_MAX),
+ conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(),
+ OPERATION_POOL_NAME);
+ }
+
+ public void stop() {
+ if (scheduledExecutorService != null) {
+ timeoutCheckerFuture.cancel(true);
+ scheduledExecutorService.shutdown();
+ }
+ if (operationExecutorService != null) {
+ operationExecutorService.shutdown();
+ }
+ LOG.info("SessionManager is stopped.");
+ }
+
+ public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException {
+ Session session = sessions.get(sessionHandle);
+ if (session == null) {
+ String msg = String.format("Session '%s' does not exist.", sessionHandle);
+ LOG.warn(msg);
+ throw new SqlGatewayException(msg);
+ }
+ session.touch();
+ return session;
+ }
+
+ /**
+ * Register the session into the {@link SessionManager}.
+ *
+ * <p>Use synchronized to keep the checkSessionCount and build the Session are atomic.
+ */
+ public synchronized Session openSession(SessionEnvironment environment)
+ throws SqlGatewayException {
+ // check session limit
+ checkSessionCount();
+
+ Session session = null;
+ SessionHandle sessionId = null;
+ do {
+ sessionId = SessionHandle.create();
+ } while (sessions.containsKey(sessionId));
+
+ SessionContext sessionContext =
+ SessionContext.create(
+ defaultContext,
+ sessionId,
+ environment.getSessionEndpointVersion(),
+ Configuration.fromMap(environment.getSessionConfig()));
+ session = new Session(sessionContext);
+ sessions.put(sessionId, session);
+
+ LOG.info(
+ "Session {} is opened, and the number of current sessions is {}.",
+ session.getSessionHandle(),
+ sessions.size());
+
+ return session;
+ }
+
+ public void closeSession(SessionHandle sessionId) throws SqlGatewayException {
+ Session session = getSession(sessionId);
+ closeSession(session);
+ }
+
+ // ------------------------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------------------------
+
+ private void checkSessionCount() throws SqlGatewayException {
+ if (maxSessionCount <= 0) {
+ return;
+ }
+ if (sessions.size() >= maxSessionCount) {
+ String msg =
+ String.format(
+ "Failed to create session, the count of active sessions exceeds the max count: %s",
+ maxSessionCount);
+ LOG.warn(msg);
+ throw new SqlGatewayException(msg);
+ }
+ }
+
+ private boolean isSessionExpired(Session session) {
+ if (idleTimeout > 0) {
+ return (System.currentTimeMillis() - session.getLastAccessTime()) > idleTimeout;
+ } else {
+ return false;
+ }
+ }
+
+ private void closeSession(Session session) {
+ SessionHandle sessionId = session.getSessionHandle();
+ sessions.remove(sessionId);
+ session.close();
+ LOG.info("Session: {} is closed.", sessionId);
+ }
+
+ @VisibleForTesting
+ boolean isSessionAlive(SessionHandle sessionId) {
+ return sessions.containsKey(sessionId);
+ }
+
+ @VisibleForTesting
+ int currentSessionCount() {
+ return sessions.size();
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlExecutionException.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlExecutionException.java
new file mode 100644
index 00000000000..c13c48622d9
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlExecutionException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/** Exception thrown during the execution. */
+public class SqlExecutionException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SqlExecutionException(String message) {
+ super(message);
+ }
+
+ public SqlExecutionException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/ThreadUtils.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/ThreadUtils.java
new file mode 100644
index 00000000000..7483f870e9f
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/ThreadUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** Utils for thread pool executor. */
+public class ThreadUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadUtils.class);
+
+ public static ThreadPoolExecutor newThreadPool(
+ int poolSize, int poolQueueSize, long keepAliveMs, String threadPoolName) {
+ LOG.info(
+ "Created thread pool {} with core size {}, max size {} and keep alive time {}ms.",
+ threadPoolName,
+ poolSize,
+ poolQueueSize,
+ keepAliveMs);
+
+ return new ThreadPoolExecutor(
+ poolSize,
+ poolQueueSize,
+ keepAliveMs,
+ TimeUnit.MILLISECONDS,
+ new SynchronousQueue<>());
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
new file mode 100644
index 00000000000..048af7401c8
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** ITCase for {@link SqlGatewayServiceImpl}. */
+public class SqlGatewayServiceITCase extends AbstractTestBase {
+
+ @RegisterExtension
+ public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+ new SqlGatewayServiceExtension();
+
+ private static SqlGatewayServiceImpl service;
+
+ @BeforeAll
+ public static void setUp() {
+ service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService();
+ }
+
+ @Test
+ public void testOpenSessionWithConfig() {
+ Map<String, String> options = new HashMap<>();
+ options.put("key1", "val1");
+ options.put("key2", "val2");
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .addSessionConfig(options)
+ .build();
+
+ SessionHandle sessionHandle = service.openSession(environment);
+ Map<String, String> actualConfig = service.getSessionConfig(sessionHandle);
+
+ options.forEach(
+ (k, v) ->
+ assertThat(
+ String.format(
+ "Should contains (%s, %s) in the actual config.", k, v),
+ actualConfig,
+ Matchers.hasEntry(k, v)));
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java
new file mode 100644
index 00000000000..ae4ff66cd81
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link SessionManager}. */
+public class SessionManagerTest {
+
+ private SessionManager sessionManager;
+
+ @BeforeEach
+ public void setup() {
+ Configuration conf = new Configuration();
+ conf.set(
+ SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT,
+ Duration.ofSeconds(2));
+ conf.set(
+ SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL,
+ Duration.ofMillis(100));
+ conf.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 3);
+ sessionManager =
+ new SessionManager(
+ new DefaultContext(conf, Collections.singletonList(new DefaultCLI())));
+ sessionManager.start();
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ if (sessionManager != null) {
+ sessionManager.stop();
+ }
+ }
+
+ @Test
+ public void testIdleSessionCleanup() throws Exception {
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .build();
+ Session session = sessionManager.openSession(environment);
+ SessionHandle sessionId = session.getSessionHandle();
+ for (int i = 0; i < 3; i++) {
+ // should success
+ sessionManager.getSession(sessionId);
+ Thread.sleep(1000);
+ }
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ while (deadline.hasTimeLeft() && sessionManager.isSessionAlive(sessionId)) {
+ //noinspection BusyWait
+ Thread.sleep(1000);
+ }
+ assertFalse(sessionManager.isSessionAlive(sessionId));
+ }
+
+ @Test
+ public void testSessionNumberLimit() {
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .build();
+
+ sessionManager.openSession(environment);
+ sessionManager.openSession(environment);
+ sessionManager.openSession(environment);
+
+ assertEquals(3, sessionManager.currentSessionCount());
+ assertThrows(
+ SqlGatewayException.class,
+ () -> sessionManager.openSession(environment),
+ "Failed to create session, the count of active sessions exceeds the max count: 3");
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
new file mode 100644
index 00000000000..0e5fbcec5ba
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+
+/** A simple {@link Extension} to be used by tests that require a {@link SqlGatewayService}. */
+public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCallback {
+
+ private SqlGatewayService service;
+ private SessionManager sessionManager;
+ private TemporaryFolder temporaryFolder;
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ final Map<String, String> originalEnv = System.getenv();
+ try {
+ // prepare conf dir
+ temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ File confFolder = temporaryFolder.newFolder("conf");
+ File confYaml = new File(confFolder, "flink-conf.yaml");
+ if (!confYaml.createNewFile()) {
+ throw new IOException("Can't create testing flink-conf.yaml file.");
+ }
+
+ // adjust the test environment for the purposes of this test
+ Map<String, String> map = new HashMap<>(System.getenv());
+ map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
+ CommonTestUtils.setEnv(map);
+
+ sessionManager = new SessionManager(DefaultContext.load(new HashMap<>()));
+ } finally {
+ CommonTestUtils.setEnv(originalEnv);
+ }
+
+ service = new SqlGatewayServiceImpl(sessionManager);
+ sessionManager.start();
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ if (sessionManager != null) {
+ sessionManager.stop();
+ }
+ temporaryFolder.delete();
+ }
+
+ public SqlGatewayService getService() {
+ return service;
+ }
+
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
+}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index ef391d5b7b3..2ce69f66718 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -43,6 +43,8 @@ under the License.
<module>flink-table-planner-loader</module>
<module>flink-table-planner-loader-bundle</module>
<module>flink-table-runtime</module>
+ <module>flink-sql-gateway-api</module>
+ <module>flink-sql-gateway</module>
<module>flink-sql-client</module>
<module>flink-sql-parser</module>
<module>flink-sql-parser-hive</module>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index e5c99251531..3ac0d3607eb 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -79,6 +79,8 @@ flink-table/flink-table-api-java-bridge,\
flink-table/flink-table-api-scala-bridge,\
flink-table/flink-table-api-java-uber,\
flink-table/flink-sql-client,\
+flink-table/flink-sql-gateway-api,\
+flink-table/flink-sql-gateway,\
flink-table/flink-table-planner,\
flink-table/flink-table-planner-loader,\
flink-table/flink-table-planner-loader-bundle,\