You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/01 12:23:21 UTC

[GitHub] [flink] lsyldliu commented on a diff in pull request #19846: [FLINK-27768][sql-gateway] Allow executing sql for the SqlGatewayService

lsyldliu commented on code in PR #19846:
URL: https://github.com/apache/flink/pull/19846#discussion_r911903364


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.gateway.service.utils.SqlScriptReader;
+import org.apache.flink.table.gateway.service.utils.TestSqlStatement;
+import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
+
+import org.apache.calcite.util.Util;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.gateway.service.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Test {@link SqlGatewayService}#executeStatement. */
+@Execution(CONCURRENT)
+public class SqlGatewayServiceStatementITCase {
+
+    @RegisterExtension
+    @Order(1)
+    public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+    private static SqlGatewayService service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+    private final Map<String, String> replaceVars = new HashMap<>();
+
+    @BeforeAll
+    public static void setUp() {
+        service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
+    }
+
+    @BeforeEach
+    public void before(@TempDir Path temporaryFolder) throws IOException {
+        // initialize new folders for every test, so the vars can be reused by every SQL scripts
+        replaceVars.put(
+                "$VAR_STREAMING_PATH",
+                Files.createDirectory(temporaryFolder.resolve("streaming")).toFile().getPath());
+        replaceVars.put(
+                "$VAR_BATCH_PATH",
+                Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
+    }
+
+    public static Stream<String> parameters() throws Exception {
+        String first = "sql/table.q";
+        URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + first);
+        File firstFile = Paths.get(checkNotNull(url.toURI())).toFile();
+        final int commonPrefixLength = firstFile.getAbsolutePath().length() - first.length();
+        File dir = firstFile.getParentFile();
+        final List<String> paths = new ArrayList<>();
+        final FilenameFilter filter = new PatternFilenameFilter(".*\\.q$");
+        for (File f : Util.first(dir.listFiles(filter), new File[0])) {
+            paths.add(f.getAbsolutePath().substring(commonPrefixLength));
+        }
+        return paths.stream();
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void testSqlStatements(String sqlPath) throws Exception {
+        String in = getInputFromPath(sqlPath);
+        List<TestSqlStatement> testSqlStatements = SqlScriptReader.parseSqlScript(in);
+
+        assertThat(String.join("", runStatements(testSqlStatements))).isEqualTo(in);
+    }
+
+    public List<String> runStatements(List<TestSqlStatement> statements) {

Review Comment:
   Add method annotation like `CliClientITCase`?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.gateway.service.utils.SqlScriptReader;
+import org.apache.flink.table.gateway.service.utils.TestSqlStatement;
+import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
+
+import org.apache.calcite.util.Util;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.gateway.service.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Test {@link SqlGatewayService}#executeStatement. */
+@Execution(CONCURRENT)
+public class SqlGatewayServiceStatementITCase {
+
+    @RegisterExtension
+    @Order(1)
+    public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+    private static SqlGatewayService service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+    private final Map<String, String> replaceVars = new HashMap<>();
+
+    @BeforeAll
+    public static void setUp() {
+        service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
+    }
+
+    @BeforeEach
+    public void before(@TempDir Path temporaryFolder) throws IOException {
+        // initialize new folders for every test, so the vars can be reused by every SQL scripts
+        replaceVars.put(
+                "$VAR_STREAMING_PATH",
+                Files.createDirectory(temporaryFolder.resolve("streaming")).toFile().getPath());
+        replaceVars.put(
+                "$VAR_BATCH_PATH",
+                Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
+    }
+
+    public static Stream<String> parameters() throws Exception {
+        String first = "sql/table.q";
+        URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + first);
+        File firstFile = Paths.get(checkNotNull(url.toURI())).toFile();
+        final int commonPrefixLength = firstFile.getAbsolutePath().length() - first.length();
+        File dir = firstFile.getParentFile();
+        final List<String> paths = new ArrayList<>();
+        final FilenameFilter filter = new PatternFilenameFilter(".*\\.q$");
+        for (File f : Util.first(dir.listFiles(filter), new File[0])) {
+            paths.add(f.getAbsolutePath().substring(commonPrefixLength));
+        }
+        return paths.stream();
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void testSqlStatements(String sqlPath) throws Exception {
+        String in = getInputFromPath(sqlPath);
+        List<TestSqlStatement> testSqlStatements = SqlScriptReader.parseSqlScript(in);
+
+        assertThat(String.join("", runStatements(testSqlStatements))).isEqualTo(in);
+    }
+
+    public List<String> runStatements(List<TestSqlStatement> statements) {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        List<String> output = new ArrayList<>();
+        for (TestSqlStatement statement : statements) {
+            StringBuilder builder = new StringBuilder();
+            builder.append(statement.getComment());
+            builder.append(statement.getSql());
+
+            try {
+                builder.append(runSingleStatement(sessionHandle, statement.getSql()));
+            } catch (Throwable t) {
+                Throwable root = getRootCause(t);
+                builder.append(
+                        Tag.ERROR.addTag(
+                                root.getClass().getName()
+                                        + ": "
+                                        + root.getMessage().trim()
+                                        + "\n"));
+            }
+            output.add(builder.toString());
+        }
+
+        return output;
+    }
+
+    // -------------------------------------------------------------------------------------------
+    // Utility
+    // -------------------------------------------------------------------------------------------
+
+    /** Mark the output type. */
+    enum Tag {
+        INFO("!info"),
+
+        OK("!ok"),
+
+        ERROR("!error");
+
+        private final String tag;
+
+        Tag(String tag) {
+            this.tag = tag;
+        }
+
+        public String addTag(String content) {
+            return HINT_START_OF_OUTPUT + "\n" + content + tag + "\n";
+        }
+    }
+
+    private String getInputFromPath(String sqlPath) throws IOException {
+        URL url = SqlGatewayServiceStatementITCase.class.getResource("/" + sqlPath);
+
+        // replace the placeholder with specified value if exists
+        String[] keys = replaceVars.keySet().toArray(new String[0]);
+        String[] values = Arrays.stream(keys).map(replaceVars::get).toArray(String[]::new);
+
+        return StringUtils.replaceEach(
+                IOUtils.toString(checkNotNull(url), StandardCharsets.UTF_8), keys, values);
+    }
+
+    protected static Throwable getRootCause(Throwable e) {
+        Throwable root = e;
+        while (root.getCause() != null) {
+            root = root.getCause();
+        }
+        return root;
+    }
+
+    private String runSingleStatement(SessionHandle sessionHandle, String statement)
+            throws Exception {
+        OperationHandle operationHandle =
+                service.executeStatement(sessionHandle, statement, -1, new Configuration());
+        CommonTestUtils.waitUtil(
+                () ->
+                        service.getOperationInfo(sessionHandle, operationHandle)
+                                .getStatus()
+                                .isTerminalStatus(),
+                Duration.ofSeconds(100),
+                "Failed to wait operation finish.");
+
+        if (!service.getOperationInfo(sessionHandle, operationHandle).isHasResults()) {
+            return Tag.INFO.addTag("");
+        }
+
+        if (statement.toLowerCase().startsWith("explain")

Review Comment:
   Why them need to process specially? The logic of the judgment is a bit hack? Does exist other special case
   Add some comments would be better.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";

Review Comment:
   I think `job_id` would be better as an column name



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            sessionContext.setConfig(setOp.getKey().get(), setOp.getValue().get());
+            return new ResultFetcher(
+                    handle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    drill(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+            Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
+            return new ResultFetcher(
+                    handle,
+                    ResolvedSchema.of(
+                            Column.physical(SET_KEY, DataTypes.STRING()),
+                            Column.physical(SET_VALUE, DataTypes.STRING())),
+                    drill(
+                            configMap.entrySet().stream()
+                                    .map(
+                                            entry ->
+                                                    GenericRowData.of(
+                                                            StringData.fromString(entry.getKey()),
+                                                            StringData.fromString(
+                                                                    entry.getValue())))
+                                    .map(row -> (RowData) row)
+                                    .iterator()));
+        } else {
+            // Impossible
+            throw new SqlExecutionException("Illegal SetOperation: " + setOp.asSummaryString());
+        }
+    }
+
+    private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation resetOp) {
+        if (resetOp.getKey().isPresent()) {
+            sessionContext.resetConfig(resetOp.getKey().get());
+        } else {
+            sessionContext.resetAllConfig();
+        }
+        return new ResultFetcher(
+                handle,
+                TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                drill(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+    }
+
+    private ResultFetcher callModifyOperations(
+            TableEnvironmentInternal tableEnv,
+            OperationHandle handle,
+            List<ModifyOperation> modifyOperations) {
+        TableResultInternal result = tableEnv.executeInternal(modifyOperations);
+        return new ResultFetcher(
+                handle,
+                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
+                Collections.singletonList(
+                        GenericRowData.of(
+                                StringData.fromString(
+                                        result.getJobClient()
+                                                .orElseThrow(
+                                                        () ->
+                                                                new SqlExecutionException(
+                                                                        "Can't get job client for the operation."))

Review Comment:
   If we also print the `HandleIdentifier` in exception msg, Does it make sense to you?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            sessionContext.setConfig(setOp.getKey().get(), setOp.getValue().get());
+            return new ResultFetcher(
+                    handle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    drill(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+            Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();

Review Comment:
   // show all properties
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            sessionContext.setConfig(setOp.getKey().get(), setOp.getValue().get());
+            return new ResultFetcher(
+                    handle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    drill(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+            Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
+            return new ResultFetcher(
+                    handle,
+                    ResolvedSchema.of(
+                            Column.physical(SET_KEY, DataTypes.STRING()),
+                            Column.physical(SET_VALUE, DataTypes.STRING())),
+                    drill(
+                            configMap.entrySet().stream()
+                                    .map(
+                                            entry ->
+                                                    GenericRowData.of(
+                                                            StringData.fromString(entry.getKey()),
+                                                            StringData.fromString(
+                                                                    entry.getValue())))
+                                    .map(row -> (RowData) row)

Review Comment:
   ```suggestion
                                      .map(RowData.class::cast)
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java:
##########
@@ -37,6 +37,13 @@ public class ResultStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResultStore.class);
 
+    public static final ResultStore DUMMY_RESULT_STORE =
+            new ResultStore(CloseableIterator.adapterForIterator(Collections.emptyIterator()), 0);
+
+    static {
+        DUMMY_RESULT_STORE.close();

Review Comment:
   Why here close it?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {

Review Comment:
   Do we need make it as an interface, provide different implementation for different scenario?  Will more method need here in the future?
   
   
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -100,22 +121,44 @@ public Map<String, String> getConfigMap() {
         return sessionConf.toMap();
     }
 
-    // --------------------------------------------------------------------------------------------
-    // Method to execute commands
-    // --------------------------------------------------------------------------------------------
+    public void setConfig(String key, String value) {
+        try {
+            // Test whether the key value will influence the creation of the Executor.
+            createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)));
+        } catch (Exception e) {
+            // get error and reset the key with old value
+            throw new SqlExecutionException(
+                    String.format("Failed to set key %s with value %s.", key, value), e);
+        }
+        sessionConf.setString(key, value);
+    }
 
-    /** Close resources, e.g. catalogs. */
-    public void close() {
-        for (String name : sessionState.catalogManager.listCatalogs()) {
-            sessionState.catalogManager.unregisterCatalog(name, true);
+    public synchronized void resetConfig(String key) {
+        Configuration configuration = defaultContext.getFlinkConfig();
+        // If the key exist in default yaml , reset to default
+        if (configuration.containsKey(key)) {

Review Comment:
   I think we can
   `ConfigOption<String> option = ConfigOptions.key(key).stringType().noDefaultValue();
           if (configuration.contains(option)) {
               String defaultValue =
                       configuration.get(option);
               setConfig(key, defaultValue);
           } else {
               sessionConf.removeConfig(option);
           }`



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            sessionContext.setConfig(setOp.getKey().get(), setOp.getValue().get());
+            return new ResultFetcher(
+                    handle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    drill(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+            Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
+            return new ResultFetcher(
+                    handle,
+                    ResolvedSchema.of(
+                            Column.physical(SET_KEY, DataTypes.STRING()),
+                            Column.physical(SET_VALUE, DataTypes.STRING())),
+                    drill(
+                            configMap.entrySet().stream()
+                                    .map(
+                                            entry ->
+                                                    GenericRowData.of(
+                                                            StringData.fromString(entry.getKey()),
+                                                            StringData.fromString(
+                                                                    entry.getValue())))
+                                    .map(row -> (RowData) row)
+                                    .iterator()));
+        } else {
+            // Impossible
+            throw new SqlExecutionException("Illegal SetOperation: " + setOp.asSummaryString());
+        }
+    }
+
+    private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation resetOp) {
+        if (resetOp.getKey().isPresent()) {
+            sessionContext.resetConfig(resetOp.getKey().get());

Review Comment:
   Ditto



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -100,22 +121,44 @@ public Map<String, String> getConfigMap() {
         return sessionConf.toMap();
     }
 
-    // --------------------------------------------------------------------------------------------
-    // Method to execute commands
-    // --------------------------------------------------------------------------------------------
+    public void setConfig(String key, String value) {
+        try {
+            // Test whether the key value will influence the creation of the Executor.
+            createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)));
+        } catch (Exception e) {
+            // get error and reset the key with old value
+            throw new SqlExecutionException(
+                    String.format("Failed to set key %s with value %s.", key, value), e);
+        }
+        sessionConf.setString(key, value);
+    }
 
-    /** Close resources, e.g. catalogs. */
-    public void close() {
-        for (String name : sessionState.catalogManager.listCatalogs()) {
-            sessionState.catalogManager.unregisterCatalog(name, true);
+    public synchronized void resetConfig(String key) {
+        Configuration configuration = defaultContext.getFlinkConfig();
+        // If the key exist in default yaml , reset to default

Review Comment:
   ```suggestion
           // If the key exist in default yaml, reset to default
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {

Review Comment:
   would be better if add following comment:
   // set a property
   



##########
flink-table/flink-sql-gateway/src/test/resources/sql/insert.q:
##########
@@ -0,0 +1,182 @@
+# insert.q - insert

Review Comment:
   Not all cases are in SqlClient currently supported?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            sessionContext.setConfig(setOp.getKey().get(), setOp.getValue().get());

Review Comment:
   Do we need log it when set a properties?
   In addition, the key and value maybe null string or null string prefix or suffix, so we should trim it like SqlClient?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** An executor to execute the {@link Operation}. */
+public class OperationExecutor {
+
+    private static final String JOB_ID = "job id";
+    private static final String SET_KEY = "key";
+    private static final String SET_VALUE = "value";
+
+    private final SessionContext sessionContext;
+    private final Configuration executionConfig;
+
+    @VisibleForTesting
+    public OperationExecutor(SessionContext context, Configuration executionConfig) {
+        this.sessionContext = context;
+        this.executionConfig = executionConfig;
+    }
+
+    public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+        // Instantiate the TableEnvironment lazily
+        TableEnvironmentInternal tableEnv = sessionContext.createTableEnvironment();
+        tableEnv.getConfig().getConfiguration().addAll(executionConfig);
+
+        List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
+        if (parsedOperations.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+                            + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
+        }
+        Operation op = parsedOperations.get(0);
+        if (op instanceof SetOperation) {
+            return callSetOperation(tableEnv, handle, (SetOperation) op);
+        } else if (op instanceof ResetOperation) {
+            return callResetOperation(handle, (ResetOperation) op);
+        } else if (op instanceof BeginStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof EndStatementSetOperation) {
+            // TODO: support statement set in the FLINK-27837
+            throw new UnsupportedOperationException();
+        } else if (op instanceof ModifyOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof StatementSetOperation) {
+            return callModifyOperations(
+                    tableEnv, handle, ((StatementSetOperation) op).getOperations());
+        } else if (op instanceof QueryOperation) {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else {
+            TableResultInternal result = tableEnv.executeInternal(op);
+            return new ResultFetcher(
+                    handle, result.getResolvedSchema(), drill(result.collectInternal()));
+        }
+    }
+
+    private ResultFetcher callSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
+        if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
+            sessionContext.setConfig(setOp.getKey().get(), setOp.getValue().get());
+            return new ResultFetcher(
+                    handle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    drill(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
+            Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
+            return new ResultFetcher(
+                    handle,
+                    ResolvedSchema.of(
+                            Column.physical(SET_KEY, DataTypes.STRING()),
+                            Column.physical(SET_VALUE, DataTypes.STRING())),
+                    drill(
+                            configMap.entrySet().stream()
+                                    .map(
+                                            entry ->
+                                                    GenericRowData.of(
+                                                            StringData.fromString(entry.getKey()),
+                                                            StringData.fromString(
+                                                                    entry.getValue())))
+                                    .map(row -> (RowData) row)
+                                    .iterator()));
+        } else {
+            // Impossible
+            throw new SqlExecutionException("Illegal SetOperation: " + setOp.asSummaryString());
+        }
+    }
+
+    private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation resetOp) {
+        if (resetOp.getKey().isPresent()) {
+            sessionContext.resetConfig(resetOp.getKey().get());

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org