You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/08 07:39:42 UTC

[GitHub] [flink] SteNicholas commented on a diff in pull request #19823: [FLINK-27766][sql-gateway] Introduce the framework of the SqlGatewayService

SteNicholas commented on code in PR #19823:
URL: https://github.com/apache/flink/pull/19823#discussion_r891966311


##########
flink-table/flink-sql-gateway-common/pom.xml:
##########
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table</artifactId>
+        <version>1.16-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-sql-gateway-common</artifactId>
+    <name>Flink : Table : SQL Gateway Common</name>
+    <description>
+        This module contains the basic API about the SqlGateway.

Review Comment:
   ```suggestion
           This module contains extension points of the SQL Gateway API.
   ```



##########
flink-table/flink-sql-gateway-common/pom.xml:
##########
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-table</artifactId>
+        <version>1.16-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-sql-gateway-common</artifactId>
+    <name>Flink : Table : SQL Gateway Common</name>

Review Comment:
   ```suggestion
       <name>Flink : Table : SQL Gateway : Common</name>
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/SqlGatewayService.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The definition for SqlGatewayService. The SqlGatewayService is the core to process the request
+ * from the endpoints.
+ */
+@PublicEvolving
+public interface SqlGatewayService {
+
+    // -------------------------------------------------------------------------------------------
+    // Session Management
+    // -------------------------------------------------------------------------------------------
+
+    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;

Review Comment:
   Please add the JavaDoc of the interfaces.



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options for {@link SqlGatewayService}. */

Review Comment:
   ```suggestion
   /** Config options of the {@link SqlGatewayService}. */
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/endpoint/EndpointVersion.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.endpoint;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Version to identify the Endpoint. */

Review Comment:
   ```suggestion
   /** Version of the endpoint. */
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/operation/OperationType.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** The Operation Type. */
+@PublicEvolving
+public enum OperationType {

Review Comment:
   Does the `OperationType` have the enum value like GET_TYPE_INFO, GET_CATALOGS, GET_SCHEMAS, GET_TABLES, GET_TABLE_TYPES, GET_COLUMNS, GET_FUNCTIONS?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/ResultSet.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** The collection of the results. */
+@PublicEvolving
+public class ResultSet {
+
+    private final ResultType resultType;
+
+    private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+
+    public ResultSet(
+            ResultType resultType,
+            Long nextToken,
+            ResolvedSchema resultSchema,
+            List<RowData> data) {
+        this.nextToken = nextToken;
+        this.resultType = resultType;
+        this.resultSchema = resultSchema;
+        this.data = data;
+    }
+
+    public ResultType getResultType() {
+        return resultType;
+    }
+
+    public Long getNextToken() {
+        return nextToken;
+    }
+
+    public ResolvedSchema getResultSchema() {
+        return resultSchema;
+    }
+
+    public List<RowData> getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "ResultSet{\n"
+                        + "  resultType=%s,\n"
+                        + "  resultSchema=%s,\n"
+                        + "  data=[%s]\n"
+                        + "}",
+                resultType,
+                resultSchema.toString(),
+                data.stream().map(Object::toString).collect(Collectors.joining(",")));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ResultSet)) {
+            return false;
+        }
+        ResultSet resultSet = (ResultSet) o;
+        return resultType == resultSet.resultType
+                && Objects.equals(nextToken, resultSet.nextToken)
+                && Objects.equals(resultSchema, resultSet.resultSchema)
+                && Objects.equals(data, resultSet.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resultType, nextToken, resultSchema, data);
+    }
+
+    /** Describe the kind of the ResultSet. */

Review Comment:
   ```suggestion
       /** Describe the kind of the result. */
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The context memorized initial configuration. */
+public class DefaultContext {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class);
+
+    private final Configuration flinkConfig;
+
+    public DefaultContext(Configuration flinkConfig, List<CustomCommandLine> commandLines) {
+        this.flinkConfig = flinkConfig;
+        // initialize default file system
+        FileSystem.initialize(
+                flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+        Options commandLineOptions = collectCommandLineOptions(commandLines);
+        try {
+            CommandLine deploymentCommandLine =
+                    CliFrontendParser.parse(commandLineOptions, new String[] {}, true);
+            flinkConfig.addAll(
+                    createExecutionConfig(
+                            deploymentCommandLine,
+                            commandLineOptions,
+                            commandLines,
+                            Collections.emptyList()));
+        } catch (Exception e) {
+            throw new SqlGatewayException(

Review Comment:
   Log the `commandLineOptions `.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ExecutionResult;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Entity that contains the resource to run and the execution results. */
+public class Operation {

Review Comment:
   Does the `Operation` have the dead lock situation?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/HandleIdentifier.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Identifiers for Handle. */

Review Comment:
   ```suggestion
   /** Identifier for {@link OperationHandle}. */
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ExecutionResult.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.table.gateway.common.results.ResultSet;
+
+import static org.apache.flink.table.gateway.service.utils.Constants.FETCH_ALL;
+
+/** Describe the execution results. */
+public class ExecutionResult {
+
+    private final ResultFetcher resultFetcher;
+
+    public ExecutionResult(ResultFetcher fetcher) {
+        this.resultFetcher = fetcher;
+    }
+
+    public ResultSet fetchResults(long token, int maxRows) {
+        if (maxRows != FETCH_ALL && maxRows <= 0) {
+            throw new IllegalArgumentException(

Review Comment:
   ```suggestion
               throw new SqlGatewayException(
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/SqlGatewayService.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The definition for SqlGatewayService. The SqlGatewayService is the core to process the request

Review Comment:
   ```suggestion
    * A service of SQL gateway is responsible for talking requests from the endpoints.
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/HandleIdentifier.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Identifiers for Handle. */
+@PublicEvolving
+public class HandleIdentifier {
+
+    private final UUID publicId;
+    private final UUID secretId;

Review Comment:
   Adds the getter for the `secretId`.



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options for {@link SqlGatewayService}. */
+@PublicEvolving
+public class SqlGatewayServiceConfigOptions {
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
+            key("sql-gateway.session.idle-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))

Review Comment:
   Why is the default value of the `sql.gateway.session.idle.timeout` 5 minutes? And what is disabled when the value is zero or negative value? 



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options for {@link SqlGatewayService}. */
+@PublicEvolving
+public class SqlGatewayServiceConfigOptions {
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
+            key("sql-gateway.session.idle-timeout")

Review Comment:
   ```suggestion
               key("sql.gateway.session.idle.timeout")
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.utils.Constants;
+import org.apache.flink.table.gateway.service.utils.ThreadUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/** Manage the lifecycle of the {@code Session}. */
+public class SessionManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class);
+
+    private final DefaultContext defaultContext;
+
+    private final long idleTimeout;
+    private final long checkInterval;
+    private final int maxSessionCount;
+
+    private final Map<SessionHandle, Session> sessions;
+
+    private ExecutorService operationExecutorService;
+    private ScheduledExecutorService scheduledExecutorService;
+    private ScheduledFuture<?> timeoutCheckerFuture;
+
+    public SessionManager(DefaultContext defaultContext) {
+        this.defaultContext = defaultContext;
+        ReadableConfig conf = defaultContext.getFlinkConfig();
+        this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis();
+        this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis();
+        this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM);
+        this.sessions = new ConcurrentHashMap<>();
+    }
+
+    public void start() {
+        if (checkInterval > 0 && idleTimeout > 0) {
+            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+            timeoutCheckerFuture =
+                    scheduledExecutorService.scheduleAtFixedRate(
+                            () -> {
+                                LOG.debug(
+                                        "Start to cleanup expired sessions, current session count: {}",
+                                        sessions.size());
+                                for (Map.Entry<SessionHandle, Session> entry :
+                                        sessions.entrySet()) {
+                                    SessionHandle sessionId = entry.getKey();
+                                    Session session = entry.getValue();
+                                    if (isSessionExpired(session)) {
+                                        LOG.info("Session {} is expired, close it...", sessionId);
+                                        closeSession(session);
+                                    }
+                                }
+                                LOG.debug(
+                                        "Removing expired session finished, current session count: {}",
+                                        sessions.size());
+                            },
+                            checkInterval,
+                            checkInterval,
+                            TimeUnit.MILLISECONDS);
+        }
+
+        ReadableConfig conf = defaultContext.getFlinkConfig();
+        operationExecutorService =
+                ThreadUtils.newDaemonQueuedThreadPool(
+                        conf.get(SQL_GATEWAY_WORKER_THREADS_MIN),
+                        conf.get(SQL_GATEWAY_WORKER_THREADS_MAX),
+                        conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(),
+                        Constants.OPERATION_POOL_NAME);
+    }
+
+    public void stop() {
+        if (scheduledExecutorService != null) {
+            timeoutCheckerFuture.cancel(true);
+            scheduledExecutorService.shutdown();
+        }
+        if (operationExecutorService != null) {
+            operationExecutorService.shutdown();
+        }
+        LOG.info("SessionManager is stopped.");
+    }
+
+    public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException {
+        Session session = sessions.get(sessionHandle);
+        if (session == null) {
+            String msg = String.format("Session '%s' does not exist.", sessionHandle);
+            LOG.warn(msg);
+            throw new SqlGatewayException(msg);
+        }
+        session.touch();
+        return session;
+    }
+
+    /**
+     * Register the session into the {@link SessionManager}.
+     *
+     * <p>Use synchronized to keep the checkSessionCount and build the Session are atomic.
+     */
+    public synchronized Session openSession(SessionEnvironment environment)
+            throws SqlGatewayException {
+        // check session limit
+        checkSessionCount();
+
+        Session session = null;
+        while (true) {
+            SessionHandle sessionId = SessionHandle.create();
+            SessionContext sessionContext =
+                    SessionContext.create(
+                            defaultContext,
+                            sessionId,
+                            environment.getSessionEndpointVersion(),
+                            Configuration.fromMap(environment.getSessionConfig()),
+                            operationExecutorService);
+
+            session = new Session(sessionContext);
+            // only put when the session id is not exist to avoid conflicts.
+            if (sessions.putIfAbsent(sessionId, session) == null) {

Review Comment:
   This could check whether the session id isn't exist and create the `Session`.



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/HandleIdentifier.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Identifiers for Handle. */
+@PublicEvolving
+public class HandleIdentifier {
+
+    private final UUID publicId;
+    private final UUID secretId;
+
+    public HandleIdentifier(UUID publicId, UUID secretId) {
+        this.publicId = publicId;
+        this.secretId = secretId;
+    }
+
+    public UUID getPublicId() {
+        return publicId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof HandleIdentifier)) {
+            return false;
+        }
+        HandleIdentifier that = (HandleIdentifier) o;
+        return Objects.equals(publicId, that.publicId) && Objects.equals(secretId, that.secretId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(publicId, secretId);
+    }
+
+    @Override
+    public String toString() {
+        return publicId.toString();

Review Comment:
   Why not add the `toString` of the `secretId`?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options for {@link SqlGatewayService}. */
+@PublicEvolving
+public class SqlGatewayServiceConfigOptions {
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
+            key("sql-gateway.session.idle-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL =
+            key("sql-gateway.session.check-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription(
+                            "The check interval for session timeout, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM =
+            key("sql-gateway.session.max-num")
+                    .intType()
+                    .defaultValue(1000000)
+                    .withDescription(
+                            "The check interval for session timeout, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX =
+            key("sql-gateway.worker.threads.max")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription("Maximum number of worker threads for gateway GRPC server.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MIN =
+            key("sql-gateway.worker.threads.min")
+                    .intType()
+                    .defaultValue(5)

Review Comment:
   Why is the default value of the `sql.gateway.worker.threads.min.num` 5?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Config options for {@link SqlGatewayService}. */
+@PublicEvolving
+public class SqlGatewayServiceConfigOptions {
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
+            key("sql-gateway.session.idle-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL =
+            key("sql-gateway.session.check-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription(
+                            "The check interval for session timeout, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM =
+            key("sql-gateway.session.max-num")
+                    .intType()
+                    .defaultValue(1000000)
+                    .withDescription(
+                            "The check interval for session timeout, which can be disabled by setting to zero or negative value.");
+
+    public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX =
+            key("sql-gateway.worker.threads.max")

Review Comment:
   ```suggestion
               key("sql.gateway.worker.threads.max.num")
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/operation/OperationHandle.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.common.HandleIdentifier;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** {@link OperationHandle} to index the {@code Operation}. */
+@PublicEvolving
+public class OperationHandle {

Review Comment:
   What is the `OperationHandle` used for? IMO, the `OperationHandle ` is same as `HandleIdentifier`.



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/OperationInfo.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+
+import java.util.Objects;
+
+/** Info to describe the {@code Operation}. */

Review Comment:
   ```suggestion
   /** Information of the {@code Operation}. */
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/operation/OperationStatus.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Status to describe the {@code Operation}. */
+@PublicEvolving
+public enum OperationStatus {
+    INITIALIZED(false),
+
+    PENDING(false),
+
+    RUNNING(false),
+
+    FINISHED(true),
+
+    CANCELED(true),
+
+    CLOSED(true),
+
+    ERROR(true),
+
+    TIMEOUT(true);
+
+    private final boolean isTerminalStatus;

Review Comment:
   Is the `isTerminalStatus` necessary here? IMO, this could a util method to return terminal status.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** A result store which stores and buffers results. */
+public class ResultStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResultStore.class);
+
+    private final CloseableIterator<RowData> result;
+    private final List<RowData> recordsBuffer = new ArrayList<>();
+    private final int maxBufferSize;
+
+    private final Object resultLock = new Object();
+    private final AtomicReference<SqlExecutionException> executionException =
+            new AtomicReference<>();
+    private final ResultRetrievalThread retrievalThread = new ResultRetrievalThread();
+
+    public ResultStore(CloseableIterator<RowData> result, int maxBufferSize) {
+        this.result = result;
+        this.maxBufferSize = maxBufferSize;
+        this.retrievalThread.start();
+    }
+
+    public void close() {
+        retrievalThread.isRunning = false;
+        retrievalThread.interrupt();
+
+        try {
+            result.close();
+        } catch (Exception e) {
+            LOG.error("Failed to close the ResultStore. Ignore the error.", e);
+        }
+    }
+
+    public Optional<List<RowData>> retrieveRecords() {
+        synchronized (resultLock) {
+            // retrieval thread is alive return a record if available
+            // but the program must not have failed
+            if (isRetrieving() && executionException.get() == null) {
+                if (recordsBuffer.isEmpty()) {
+                    return Optional.of(Collections.emptyList());
+                } else {
+                    final List<RowData> change = new ArrayList<>(recordsBuffer);
+                    recordsBuffer.clear();
+                    resultLock.notify();
+                    return Optional.of(change);
+                }
+            }
+            // retrieval thread is dead but there is still a record to be delivered
+            else if (!isRetrieving() && !recordsBuffer.isEmpty()) {
+                final List<RowData> change = new ArrayList<>(recordsBuffer);
+                recordsBuffer.clear();
+                return Optional.of(change);
+            }
+            // no results can be returned anymore
+            else {
+                return handleMissingResult();
+            }
+        }
+    }
+
+    private boolean isRetrieving() {
+        return retrievalThread.isRunning;
+    }
+
+    private Optional<List<RowData>> handleMissingResult() {
+        if (executionException.get() != null) {
+            throw executionException.get();
+        }
+
+        // we assume that a bounded job finished
+        return Optional.empty();

Review Comment:
   Why does here assume that a bounded job finished?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/ResultSet.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** The collection of the results. */
+@PublicEvolving
+public class ResultSet {
+
+    private final ResultType resultType;
+
+    private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+
+    public ResultSet(
+            ResultType resultType,
+            Long nextToken,
+            ResolvedSchema resultSchema,
+            List<RowData> data) {
+        this.nextToken = nextToken;
+        this.resultType = resultType;
+        this.resultSchema = resultSchema;
+        this.data = data;
+    }
+
+    public ResultType getResultType() {
+        return resultType;
+    }
+
+    public Long getNextToken() {
+        return nextToken;
+    }
+
+    public ResolvedSchema getResultSchema() {
+        return resultSchema;
+    }
+
+    public List<RowData> getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "ResultSet{\n"
+                        + "  resultType=%s,\n"
+                        + "  resultSchema=%s,\n"
+                        + "  data=[%s]\n"
+                        + "}",
+                resultType,
+                resultSchema.toString(),
+                data.stream().map(Object::toString).collect(Collectors.joining(",")));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ResultSet)) {
+            return false;
+        }
+        ResultSet resultSet = (ResultSet) o;
+        return resultType == resultSet.resultType
+                && Objects.equals(nextToken, resultSet.nextToken)
+                && Objects.equals(resultSchema, resultSet.resultSchema)
+                && Objects.equals(data, resultSet.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resultType, nextToken, resultSchema, data);
+    }
+
+    /** Describe the kind of the ResultSet. */
+    public enum ResultType {
+        PAYLOAD,

Review Comment:
   Adds the comments of `PAYLOAD`.



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/operation/OperationStatus.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Status to describe the {@code Operation}. */

Review Comment:
   Could you please describe the state machine of the `Operation` here?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/ResultSet.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** The collection of the results. */
+@PublicEvolving
+public class ResultSet {
+
+    private final ResultType resultType;
+
+    private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+
+    public ResultSet(
+            ResultType resultType,
+            Long nextToken,
+            ResolvedSchema resultSchema,
+            List<RowData> data) {
+        this.nextToken = nextToken;
+        this.resultType = resultType;
+        this.resultSchema = resultSchema;
+        this.data = data;
+    }
+
+    public ResultType getResultType() {
+        return resultType;
+    }
+
+    public Long getNextToken() {
+        return nextToken;
+    }
+
+    public ResolvedSchema getResultSchema() {
+        return resultSchema;
+    }
+
+    public List<RowData> getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "ResultSet{\n"
+                        + "  resultType=%s,\n"
+                        + "  resultSchema=%s,\n"
+                        + "  data=[%s]\n"
+                        + "}",
+                resultType,
+                resultSchema.toString(),
+                data.stream().map(Object::toString).collect(Collectors.joining(",")));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ResultSet)) {
+            return false;
+        }
+        ResultSet resultSet = (ResultSet) o;
+        return resultType == resultSet.resultType
+                && Objects.equals(nextToken, resultSet.nextToken)
+                && Objects.equals(resultSchema, resultSet.resultSchema)
+                && Objects.equals(data, resultSet.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resultType, nextToken, resultSchema, data);
+    }
+
+    /** Describe the kind of the ResultSet. */
+    public enum ResultType {
+        PAYLOAD,
+
+        EOS

Review Comment:
   Ditto.



##########
flink-table/flink-sql-gateway/pom.xml:
##########
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.16-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-sql-gateway</artifactId>
+    <name>Flink : Table : SQL Gateway</name>
+
+    <dependencies>
+        <!-- Flink client to submit jobs -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Table ecosystem -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-gateway-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-gateway-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+    <plugins>
+    <!-- Build flink-table-sqlserver jar -->

Review Comment:
   Why does this build flink-table-sqlserver jar?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/** The implementation for the {@link SqlGatewayService}. */
+public class SqlGatewayServiceImpl implements SqlGatewayService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqlGatewayServiceImpl.class);
+
+    private final SessionManager sessionManager;
+
+    public SqlGatewayServiceImpl(SessionManager sessionManager) {
+        this.sessionManager = sessionManager;
+    }
+
+    @Override
+    public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException {
+        try {
+            return sessionManager.openSession(environment).getSessionHandle();
+        } catch (Throwable e) {
+            LOG.error("Failed to openSession.", e);

Review Comment:
   Why not try {} catch () {} in `SessionManager`?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/session/SessionEnvironment.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.gateway.common.endpoint.EndpointVersion;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Environment to initialize the {@code Session}. */

Review Comment:
   ```suggestion
   /** Environment of the {@code Session}. */
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/utils/SqlGatewayException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** General exception for SQL gateway related errors. */

Review Comment:
   ```suggestion
   /**
    * General Exception for all errors during SQL gateway serving.
    *
    * <p>This exception indicates that an internal error occurred or that a feature is not supported
    * yet. Usually, this exception does not indicate a fault of the user.
    */
   ```



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/session/SessionHandle.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.common.HandleIdentifier;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Session Handle that used to identify the Session. */

Review Comment:
   What is the difference between the `SessionHandle` and `OperationHandle`?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/ResultSet.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** The collection of the results. */
+@PublicEvolving
+public class ResultSet {
+
+    private final ResultType resultType;
+
+    private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+
+    public ResultSet(
+            ResultType resultType,
+            Long nextToken,
+            ResolvedSchema resultSchema,
+            List<RowData> data) {
+        this.nextToken = nextToken;
+        this.resultType = resultType;
+        this.resultSchema = resultSchema;
+        this.data = data;
+    }
+
+    public ResultType getResultType() {
+        return resultType;
+    }
+
+    public Long getNextToken() {
+        return nextToken;
+    }
+
+    public ResolvedSchema getResultSchema() {
+        return resultSchema;
+    }
+
+    public List<RowData> getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "ResultSet{\n"

Review Comment:
   `toString` method forgets the `nextToken`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The context memorized initial configuration. */
+public class DefaultContext {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class);
+
+    private final Configuration flinkConfig;
+
+    public DefaultContext(Configuration flinkConfig, List<CustomCommandLine> commandLines) {
+        this.flinkConfig = flinkConfig;
+        // initialize default file system
+        FileSystem.initialize(
+                flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+        Options commandLineOptions = collectCommandLineOptions(commandLines);
+        try {
+            CommandLine deploymentCommandLine =
+                    CliFrontendParser.parse(commandLineOptions, new String[] {}, true);
+            flinkConfig.addAll(
+                    createExecutionConfig(
+                            deploymentCommandLine,
+                            commandLineOptions,
+                            commandLines,
+                            Collections.emptyList()));
+        } catch (Exception e) {
+            throw new SqlGatewayException(
+                    "Could not load available CLI with Environment Deployment entry.", e);
+        }
+    }
+
+    public Configuration getFlinkConfig() {
+        return flinkConfig;
+    }
+
+    private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+        final Options customOptions = new Options();
+        for (CustomCommandLine customCommandLine : commandLines) {
+            customCommandLine.addGeneralOptions(customOptions);
+            customCommandLine.addRunOptions(customOptions);
+        }
+        return CliFrontendParser.mergeOptions(
+                CliFrontendParser.getRunCommandOptions(), customOptions);
+    }
+
+    private static Configuration createExecutionConfig(
+            CommandLine commandLine,
+            Options commandLineOptions,
+            List<CustomCommandLine> availableCommandLines,
+            List<URL> dependencies)
+            throws FlinkException {
+        LOG.debug("Available commandline options: {}", commandLineOptions);
+        List<String> options =
+                Stream.of(commandLine.getOptions())
+                        .map(o -> o.getOpt() + "=" + o.getValue())
+                        .collect(Collectors.toList());
+        LOG.debug(
+                "Instantiated commandline args: {}, options: {}",
+                commandLine.getArgList(),
+                options);
+
+        final CustomCommandLine activeCommandLine =
+                findActiveCommandLine(availableCommandLines, commandLine);
+        LOG.debug(
+                "Available commandlines: {}, active commandline: {}",
+                availableCommandLines,
+                activeCommandLine);
+
+        Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
+
+        try {
+            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
+            final ExecutionConfigAccessor executionConfigAccessor =
+                    ExecutionConfigAccessor.fromProgramOptions(programOptions, dependencies);
+            executionConfigAccessor.applyToConfiguration(executionConfig);
+        } catch (CliArgsException e) {
+            throw new SqlGatewayException("Invalid deployment run options.", e);
+        }
+
+        LOG.info("Executor config: {}", executionConfig);
+        return executionConfig;
+    }
+
+    private static CustomCommandLine findActiveCommandLine(
+            List<CustomCommandLine> availableCommandLines, CommandLine commandLine) {
+        for (CustomCommandLine cli : availableCommandLines) {
+            if (cli.isActive(commandLine)) {
+                return cli;
+            }
+        }
+        throw new SqlGatewayException("Could not find a matching deployment.");

Review Comment:
   How to understande the matching deployment?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The context memorized initial configuration. */
+public class DefaultContext {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class);
+
+    private final Configuration flinkConfig;
+
+    public DefaultContext(Configuration flinkConfig, List<CustomCommandLine> commandLines) {
+        this.flinkConfig = flinkConfig;
+        // initialize default file system
+        FileSystem.initialize(
+                flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+        Options commandLineOptions = collectCommandLineOptions(commandLines);
+        try {
+            CommandLine deploymentCommandLine =
+                    CliFrontendParser.parse(commandLineOptions, new String[] {}, true);
+            flinkConfig.addAll(
+                    createExecutionConfig(
+                            deploymentCommandLine,
+                            commandLineOptions,
+                            commandLines,
+                            Collections.emptyList()));
+        } catch (Exception e) {
+            throw new SqlGatewayException(
+                    "Could not load available CLI with Environment Deployment entry.", e);
+        }
+    }
+
+    public Configuration getFlinkConfig() {
+        return flinkConfig;
+    }
+
+    private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+        final Options customOptions = new Options();
+        for (CustomCommandLine customCommandLine : commandLines) {
+            customCommandLine.addGeneralOptions(customOptions);
+            customCommandLine.addRunOptions(customOptions);
+        }
+        return CliFrontendParser.mergeOptions(
+                CliFrontendParser.getRunCommandOptions(), customOptions);
+    }
+
+    private static Configuration createExecutionConfig(
+            CommandLine commandLine,
+            Options commandLineOptions,
+            List<CustomCommandLine> availableCommandLines,
+            List<URL> dependencies)
+            throws FlinkException {
+        LOG.debug("Available commandline options: {}", commandLineOptions);
+        List<String> options =
+                Stream.of(commandLine.getOptions())
+                        .map(o -> o.getOpt() + "=" + o.getValue())
+                        .collect(Collectors.toList());
+        LOG.debug(
+                "Instantiated commandline args: {}, options: {}",
+                commandLine.getArgList(),
+                options);
+
+        final CustomCommandLine activeCommandLine =
+                findActiveCommandLine(availableCommandLines, commandLine);
+        LOG.debug(
+                "Available commandlines: {}, active commandline: {}",
+                availableCommandLines,
+                activeCommandLine);
+
+        Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
+
+        try {
+            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
+            final ExecutionConfigAccessor executionConfigAccessor =
+                    ExecutionConfigAccessor.fromProgramOptions(programOptions, dependencies);
+            executionConfigAccessor.applyToConfiguration(executionConfig);
+        } catch (CliArgsException e) {
+            throw new SqlGatewayException("Invalid deployment run options.", e);
+        }
+
+        LOG.info("Executor config: {}", executionConfig);

Review Comment:
   ```suggestion
           LOG.info("Execution config: {}", executionConfig);
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/** The implementation for the {@link SqlGatewayService}. */

Review Comment:
   ```suggestion
   /** The implementation of the {@link SqlGatewayService} interface. */
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ExecutionResult;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+/** Manager for the {@link Operation}. */
+public class OperationManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class);
+
+    private final Map<OperationHandle, Operation> submittedOperations;
+    private final ExecutorService service;
+
+    public OperationManager(ExecutorService service) {
+        this.service = service;
+        submittedOperations = new ConcurrentHashMap<>();
+    }
+
+    public OperationHandle submitOperation(
+            OperationType operationType, Callable<ResultSet> executor) {
+        OperationHandle handle = OperationHandle.create();
+        Operation operation =
+                new Operation(
+                        handle,
+                        operationType,
+                        () -> {
+                            ResultSet resultSet = executor.call();
+                            List<RowData> rows = resultSet.getData();
+                            return new ExecutionResult(
+                                    new ResultFetcher(
+                                            handle,
+                                            resultSet.getResultSchema(),
+                                            CloseableIterator.adapterForIterator(rows.iterator()),
+                                            rows.size()));
+                        });
+        submittedOperations.put(handle, operation);
+        operation.run(service);
+        return handle;
+    }
+
+    public void cancelOperation(OperationHandle operationHandle) {
+        getOperation(operationHandle).cancel();
+    }
+
+    public void closeOperation(OperationHandle operationHandle) {
+        Operation opToRemove = submittedOperations.remove(operationHandle);
+        if (opToRemove != null) {
+            opToRemove.close();
+        }
+    }
+
+    public OperationInfo getOperationInfo(OperationHandle operationHandle) {
+        return getOperation(operationHandle).getOperationInfo();
+    }
+
+    public ResultSet fetchResults(OperationHandle operationHandle, long token, int maxRows) {
+        return getOperation(operationHandle).fetchResults(token, maxRows);
+    }
+
+    public void close() {
+        for (OperationHandle handle : submittedOperations.keySet()) {
+            closeOperation(handle);
+        }
+        LOG.info("Closes the Operation Manager.");
+    }
+
+    @VisibleForTesting
+    public int getOperationCount() {
+        return submittedOperations.size();
+    }
+
+    private Operation getOperation(OperationHandle operationHandle) {
+        Operation operation = submittedOperations.get(operationHandle);
+        if (operation == null) {
+            throw new SqlGatewayException(
+                    String.format(
+                            "Can not find the operation in the OperationManager with the %s.",

Review Comment:
   ```suggestion
                               "Can not find the submitted operation in the OperationManager with the %s.",
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.PlannerFactoryUtil;
+import org.apache.flink.table.gateway.common.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Context describing a session, it's mainly used for user to open a new session in the backend. If
+ * client request to open a new session, the backend {@code Executor} will maintain the session
+ * context map util users close it.
+ */
+public class SessionContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class);
+
+    private final SessionHandle sessionId;
+    private final EndpointVersion endpointVersion;
+
+    // store all options and use Configuration to build SessionState and TableConfig.
+    private final Configuration sessionConf;
+    private final SessionState sessionState;
+    private final URLClassLoader userClassloader;
+
+    private final OperationManager operationManager;
+
+    private SessionContext(
+            SessionHandle sessionId,
+            EndpointVersion endpointVersion,
+            Configuration sessionConf,
+            URLClassLoader classLoader,
+            SessionState sessionState,
+            OperationManager operationManager) {
+        this.sessionId = sessionId;
+        this.endpointVersion = endpointVersion;
+        this.sessionConf = sessionConf;
+        this.userClassloader = classLoader;
+        this.sessionState = sessionState;
+        this.operationManager = operationManager;
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Getter method
+    // --------------------------------------------------------------------------------------------
+
+    public SessionHandle getSessionId() {
+        return this.sessionId;
+    }
+
+    public Map<String, String> getConfigMap() {
+        return sessionConf.toMap();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Method to execute commands
+    // --------------------------------------------------------------------------------------------
+
+    /** Close resources, e.g. catalogs. */
+    public void close() {
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassloader)) {
+            for (String name : sessionState.catalogManager.listCatalogs()) {
+                sessionState.catalogManager.unregisterCatalog(name, true);
+            }
+        }
+        try {
+            userClassloader.close();
+        } catch (IOException e) {
+            LOG.debug("Error while closing class loader.", e);
+        }
+
+        operationManager.close();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    public static SessionContext create(
+            DefaultContext defaultContext,
+            SessionHandle sessionId,
+            EndpointVersion endpointVersion,
+            Configuration sessionConf,
+            ExecutorService operationExecutorService) {
+        // --------------------------------------------------------------------------------------------------------------
+        // Init config
+        // --------------------------------------------------------------------------------------------------------------
+
+        Configuration configuration = defaultContext.getFlinkConfig().clone();
+        configuration.addAll(sessionConf);
+
+        // --------------------------------------------------------------------------------------------------------------
+        // Init classloader
+        // --------------------------------------------------------------------------------------------------------------
+
+        URLClassLoader classLoader =
+                buildClassLoader(Collections.emptySet(), Collections.emptySet(), configuration);

Review Comment:
   Why are the `envDependencies ` and `userDependencies ` empty set?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The context memorized initial configuration. */
+public class DefaultContext {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class);
+
+    private final Configuration flinkConfig;
+
+    public DefaultContext(Configuration flinkConfig, List<CustomCommandLine> commandLines) {
+        this.flinkConfig = flinkConfig;
+        // initialize default file system
+        FileSystem.initialize(
+                flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+        Options commandLineOptions = collectCommandLineOptions(commandLines);
+        try {
+            CommandLine deploymentCommandLine =
+                    CliFrontendParser.parse(commandLineOptions, new String[] {}, true);
+            flinkConfig.addAll(
+                    createExecutionConfig(
+                            deploymentCommandLine,
+                            commandLineOptions,
+                            commandLines,
+                            Collections.emptyList()));
+        } catch (Exception e) {
+            throw new SqlGatewayException(
+                    "Could not load available CLI with Environment Deployment entry.", e);
+        }
+    }
+
+    public Configuration getFlinkConfig() {
+        return flinkConfig;
+    }
+
+    private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+        final Options customOptions = new Options();
+        for (CustomCommandLine customCommandLine : commandLines) {
+            customCommandLine.addGeneralOptions(customOptions);
+            customCommandLine.addRunOptions(customOptions);
+        }
+        return CliFrontendParser.mergeOptions(
+                CliFrontendParser.getRunCommandOptions(), customOptions);
+    }
+
+    private static Configuration createExecutionConfig(
+            CommandLine commandLine,
+            Options commandLineOptions,
+            List<CustomCommandLine> availableCommandLines,
+            List<URL> dependencies)
+            throws FlinkException {
+        LOG.debug("Available commandline options: {}", commandLineOptions);
+        List<String> options =
+                Stream.of(commandLine.getOptions())
+                        .map(o -> o.getOpt() + "=" + o.getValue())
+                        .collect(Collectors.toList());
+        LOG.debug(
+                "Instantiated commandline args: {}, options: {}",
+                commandLine.getArgList(),
+                options);
+
+        final CustomCommandLine activeCommandLine =
+                findActiveCommandLine(availableCommandLines, commandLine);
+        LOG.debug(
+                "Available commandlines: {}, active commandline: {}",
+                availableCommandLines,
+                activeCommandLine);
+
+        Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
+
+        try {
+            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
+            final ExecutionConfigAccessor executionConfigAccessor =
+                    ExecutionConfigAccessor.fromProgramOptions(programOptions, dependencies);
+            executionConfigAccessor.applyToConfiguration(executionConfig);
+        } catch (CliArgsException e) {
+            throw new SqlGatewayException("Invalid deployment run options.", e);
+        }
+
+        LOG.info("Executor config: {}", executionConfig);
+        return executionConfig;
+    }
+
+    private static CustomCommandLine findActiveCommandLine(
+            List<CustomCommandLine> availableCommandLines, CommandLine commandLine) {
+        for (CustomCommandLine cli : availableCommandLines) {
+            if (cli.isActive(commandLine)) {
+                return cli;
+            }
+        }
+        throw new SqlGatewayException("Could not find a matching deployment.");
+    }
+
+    // -------------------------------------------------------------------------------------------
+
+    public static DefaultContext load(Map<String, String> config) {
+        // 1. find the configuration directory
+        String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
+        // 2. load the global configuration
+        Configuration configuration = GlobalConfiguration.loadConfiguration(flinkConfigDir);

Review Comment:
   How to load the configuration in the Job?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class);
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private long currentToken = 0;
+    private int previousMaxFetchSize = 0;
+    private int previousResultSetSize = 0;
+    private boolean noMoreResults = false;
+
+    public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows,
+            int maxBufferSize) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.resultStore = new ResultStore(resultRows, maxBufferSize);
+    }
+
+    public void close() {
+        resultStore.close();
+    }
+
+    public ResultSet fetchResult(long token, int maxFetchSize) {
+        if (token == currentToken) {
+            // equal to the Iterator.next()
+            if (noMoreResults) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("There is no more result for operation: {}.", operationHandle);
+                }
+                return new ResultSet(
+                        ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList());
+            }
+
+            // a new token arrives, move the current buffer data into the prev buffered results.
+            bufferedPrevResults.clear();
+            if (bufferedResults.isEmpty()) {
+                // buffered results have been totally consumed,
+                // so try to fetch new results
+                Optional<List<RowData>> newResults = resultStore.retrieveRecords();
+                if (newResults.isPresent()) {
+                    bufferedResults.addAll(newResults.get());
+                } else {
+                    noMoreResults = true;
+                    return new ResultSet(
+                            ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList());
+                }
+            }
+
+            previousMaxFetchSize = maxFetchSize;
+            if (maxFetchSize > 0) {
+                previousResultSetSize = Math.min(bufferedResults.size(), maxFetchSize);
+            } else {
+                previousResultSetSize = bufferedResults.size();
+            }
+            if (LOG.isDebugEnabled()) {

Review Comment:
   Remove the `isDebugEnabled ` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org