You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/05 14:32:21 UTC

[flink] branch master updated: [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 82417ab9a5a [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint
82417ab9a5a is described below

commit 82417ab9a5ad6935a73f9b7d1bc6bd4f47ce4f61
Author: yuzelin <74...@qq.com>
AuthorDate: Thu Aug 4 21:08:14 2022 +0800

    [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint
    
    This closes #20401
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  38 ++++--
 .../table/endpoint/hive/HiveServer2Schemas.java    |  12 ++
 .../hive/util/OperationExecutorFactory.java        | 143 +++++++++++++++++++++
 .../endpoint/hive/util/StringRowDataUtils.java     |  37 ------
 .../hive/util/ThriftObjectConversions.java         |   2 +
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 107 +++++++++++++--
 .../flink/table/gateway/api/SqlGatewayService.java |  21 ++-
 .../table/gateway/api/operation/OperationType.java |   3 +
 .../gateway/api/utils/MockedSqlGatewayService.java |  11 ++
 .../gateway/service/SqlGatewayServiceImpl.java     |  15 +++
 .../service/operation/OperationExecutor.java       |  20 +++
 .../gateway/service/SqlGatewayServiceITCase.java   |  44 +++++++
 12 files changed, 391 insertions(+), 62 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 0772a4e0b24..f27168a4eb1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.endpoint.hive.util.StringRowDataUtils;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
@@ -113,7 +112,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -121,9 +119,10 @@ import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
-import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
@@ -133,7 +132,6 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
 import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
-import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -401,14 +399,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                     service.submitOperation(
                             sessionHandle,
                             OperationType.LIST_CATALOGS,
-                            () -> {
-                                Set<String> catalogNames = service.listCatalogs(sessionHandle);
-                                return new ResultSet(
-                                        EOS,
-                                        null,
-                                        GET_CATALOGS_SCHEMA,
-                                        StringRowDataUtils.toRowData(catalogNames));
-                            });
+                            createGetCatalogsExecutor(service, sessionHandle));
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
                     toTOperationHandle(
@@ -422,7 +413,28 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetSchemasResp resp = new TGetSchemasResp();
+        try {
+            SessionHandle sessionHandle = toSessionHandle(tGetSchemasReq.getSessionHandle());
+            String catalogName = tGetSchemasReq.getCatalogName();
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_SCHEMAS,
+                            createGetSchemasExecutor(
+                                    service,
+                                    sessionHandle,
+                                    catalogName,
+                                    tGetSchemasReq.getSchemaName()));
+
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_SCHEMAS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetSchemas.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index 0a6818330fd..e3df486f3e0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 /** Schemas for the HiveServer2 Endpoint result. */
@@ -35,4 +36,15 @@ public class HiveServer2Schemas {
                                     .withComment("Catalog name. NULL if not applicable.")),
                     Collections.emptyList(),
                     null);
+
+    /** Schema for {@link HiveServer2Endpoint#GetSchemas}. */
+    public static final ResolvedSchema GET_SCHEMAS_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("TABLE_SCHEMA", DataTypes.STRING())
+                                    .withComment("Schema name. NULL if not applicable."),
+                            Column.physical("TABLE_CAT", DataTypes.STRING())
+                                    .withComment("Catalog name. NULL if not applicable")),
+                    Collections.emptyList(),
+                    null);
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
new file mode 100644
index 00000000000..8e90c0d6972
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive.util;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+
+/** Factory to create the operation executor. */
+public class OperationExecutorFactory {
+
+    public static Callable<ResultSet> createGetCatalogsExecutor(
+            SqlGatewayService service, SessionHandle sessionHandle) {
+        return () -> executeGetCatalogs(service, sessionHandle);
+    }
+
+    public static Callable<ResultSet> createGetSchemasExecutor(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String schemaName) {
+        return () -> executeGetSchemas(service, sessionHandle, catalogName, schemaName);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Executors
+    // --------------------------------------------------------------------------------------------
+
+    private static ResultSet executeGetCatalogs(
+            SqlGatewayService service, SessionHandle sessionHandle) {
+        Set<String> catalogNames = service.listCatalogs(sessionHandle);
+        return new ResultSet(
+                EOS,
+                null,
+                GET_CATALOGS_SCHEMA,
+                catalogNames.stream()
+                        .map(OperationExecutorFactory::wrap)
+                        .collect(Collectors.toList()));
+    }
+
+    private static ResultSet executeGetSchemas(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String schemaName) {
+        String specifiedCatalogName =
+                catalogName == null || catalogName.equals("")
+                        ? service.getCurrentCatalog(sessionHandle)
+                        : catalogName;
+        Set<String> databaseNames =
+                filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName);
+        return new ResultSet(
+                EOS,
+                null,
+                GET_SCHEMAS_SCHEMA,
+                databaseNames.stream()
+                        .map(name -> wrap(name, specifiedCatalogName))
+                        .collect(Collectors.toList()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    private static Set<String> filter(Set<String> candidates, String pattern) {
+        Pattern compiledPattern = convertNamePattern(pattern);
+        return candidates.stream()
+                .filter(name -> compiledPattern.matcher(name).matches())
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Covert SQL 'like' pattern to a Java regular expression. Underscores (_) are converted to '.'
+     * and percent signs (%) are converted to '.*'. Note: escape characters are removed.
+     *
+     * @param pattern the SQL pattern to convert.
+     * @return the equivalent Java regular expression of the pattern.
+     */
+    private static Pattern convertNamePattern(@Nullable String pattern) {
+        if ((pattern == null) || pattern.isEmpty()) {
+            pattern = "%";
+        }
+        String wStr = ".*";
+        return Pattern.compile(
+                pattern.replaceAll("([^\\\\])%", "$1" + wStr)
+                        .replaceAll("\\\\%", "%")
+                        .replaceAll("^%", wStr)
+                        .replaceAll("([^\\\\])_", "$1.")
+                        .replaceAll("\\\\_", "_")
+                        .replaceAll("^_", "."));
+    }
+
+    private static GenericRowData wrap(Object... elements) {
+        Object[] pack = new Object[elements.length];
+        for (int i = 0; i < elements.length; i++) {
+            Object element = elements[i];
+            if (element != null) {
+                if (element instanceof String) {
+                    pack[i] = StringData.fromString((String) element);
+                } else if (element instanceof Integer) {
+                    pack[i] = element;
+                } else if (element instanceof Short) {
+                    pack[i] = element;
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Can not wrap the element %s at index %s into RowData.",
+                                    element, i));
+                }
+            }
+        }
+        return GenericRowData.of(pack);
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
deleted file mode 100644
index 103f7311c89..00000000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.endpoint.hive.util;
-
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** Convert the {@link String} to {@link RowData}. */
-public class StringRowDataUtils {
-
-    public static List<RowData> toRowData(Collection<String> external) {
-        return external.stream()
-                .map(val -> GenericRowData.of(StringData.fromString(val)))
-                .collect(Collectors.toList());
-    }
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 235796e4eb3..9e2b1cc73ed 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -162,6 +162,8 @@ public class ThriftObjectConversions {
                 return TOperationType.EXECUTE_STATEMENT;
             case LIST_CATALOGS:
                 return TOperationType.GET_CATALOGS;
+            case LIST_SCHEMAS:
+                return TOperationType.GET_SCHEMAS;
             case UNKNOWN:
                 return TOperationType.UNKNOWN;
             default:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 7b567e184c0..94d392b63c5 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
@@ -63,9 +64,13 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import java.net.InetAddress;
 import java.sql.Connection;
 import java.sql.ResultSetMetaData;
+import java.sql.Statement;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -196,18 +201,85 @@ public class HiveServer2EndpointITCase extends TestLogger {
 
     @Test
     public void testGetCatalogs() throws Exception {
-        try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
-            java.sql.ResultSet result = connection.getMetaData().getCatalogs();
-            assertSchemaEquals(
-                    ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
-                    result.getMetaData());
-
-            List<String> actual = new ArrayList<>();
-            while (result.next()) {
-                actual.add(result.getString(1));
-            }
+        runGetObjectTest(
+                connection -> connection.getMetaData().getCatalogs(),
+                ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
+                Arrays.asList(
+                        Collections.singletonList("hive"),
+                        Collections.singletonList("default_catalog")));
+    }
+
+    @Test
+    public void testGetSchemas() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getSchemas("default_catalog", null),
+                ResolvedSchema.of(
+                        Column.physical("TABLE_SCHEMA", DataTypes.STRING()),
+                        Column.physical("TABLE_CAT", DataTypes.STRING())),
+                Arrays.asList(
+                        Arrays.asList("default_database", "default_catalog"),
+                        Arrays.asList("db_test1", "default_catalog"),
+                        Arrays.asList("db_test2", "default_catalog"),
+                        Arrays.asList("db_diff", "default_catalog")));
+    }
+
+    @Test
+    public void testGetSchemasWithPattern() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getSchemas(null, "db\\_test%"),
+                ResolvedSchema.of(
+                        Column.physical("TABLE_SCHEMA", DataTypes.STRING()),
+                        Column.physical("TABLE_CAT", DataTypes.STRING())),
+                Arrays.asList(
+                        Arrays.asList("db_test1", "default_catalog"),
+                        Arrays.asList("db_test2", "default_catalog")));
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private Connection getInitializedConnection() throws Exception {
+        Connection connection = ENDPOINT_EXTENSION.getConnection();
+        Statement statement = connection.createStatement();
+        statement.execute("SET table.sql-dialect=default");
+        statement.execute("USE CATALOG `default_catalog`");
+
+        // default_catalog: db_test1 | db_test2 | db_diff | default
+        //     db_test1: temporary table tb_1, table tb_2, temporary view tb_3, view tb_4
+        //     db_test2: table tb_1, table diff_1, view tb_2, view diff_2
+        //     db_diff:  table tb_1, view tb_2
+
+        statement.execute("CREATE DATABASE db_test1");
+        statement.execute("CREATE DATABASE db_test2");
+        statement.execute("CREATE DATABASE db_diff");
 
-            assertThat(actual).contains("hive", "default_catalog");
+        statement.execute("CREATE TEMPORARY TABLE db_test1.tb_1 COMMENT 'temporary table tb_1'");
+        statement.execute("CREATE TABLE db_test1.tb_2 COMMENT 'table tb_2'");
+        statement.execute(
+                "CREATE TEMPORARY VIEW db_test1.tb_3 COMMENT 'temporary view tb_3' AS SELECT 1");
+        statement.execute("CREATE VIEW db_test1.tb_4 COMMENT 'view tb_4' AS SELECT 1");
+
+        statement.execute("CREATE TABLE db_test2.tb_1 COMMENT 'table tb_1'");
+        statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table diff_1'");
+        statement.execute("CREATE VIEW db_test2.tb_2 COMMENT 'view tb_2' AS SELECT 1");
+        statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view diff_2' AS SELECT 1");
+
+        statement.execute("CREATE TABLE db_diff.tb_1 COMMENT 'table tb_1'");
+        statement.execute("CREATE VIEW db_diff.tb_2 COMMENT 'view tb_2' AS SELECT 1");
+
+        statement.close();
+        return connection;
+    }
+
+    private void runGetObjectTest(
+            FunctionWithException<Connection, java.sql.ResultSet, Exception> resultSetSupplier,
+            ResolvedSchema expectedSchema,
+            List<List<Object>> expectedResults)
+            throws Exception {
+        try (Connection connection = getInitializedConnection();
+                java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
+            assertSchemaEquals(expectedSchema, result.getMetaData());
+            assertThat(new HashSet<>(collect(result, expectedSchema.getColumnCount())))
+                    .isEqualTo(new HashSet<>(expectedResults));
         }
     }
 
@@ -265,4 +337,17 @@ public class HiveServer2EndpointITCase extends TestLogger {
             assertThat(metaData.getColumnType(i)).isEqualTo(jdbcType);
         }
     }
+
+    private List<List<Object>> collect(java.sql.ResultSet result, int columnCount)
+            throws Exception {
+        List<List<Object>> actual = new ArrayList<>();
+        while (result.next()) {
+            List<Object> row = new ArrayList<>();
+            for (int i = 1; i <= columnCount; i++) {
+                row.add(result.getObject(i));
+            }
+            actual.add(row);
+        }
+        return actual;
+    }
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 97b377c59c4..5e77577fc02 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -185,12 +185,21 @@ public interface SqlGatewayService {
             SessionHandle sessionHandle,
             OperationHandle operationHandle,
             FetchOrientation orientation,
-            int maxRows);
+            int maxRows)
+            throws SqlGatewayException;
 
     // -------------------------------------------------------------------------------------------
     // Catalog API
     // -------------------------------------------------------------------------------------------
 
+    /**
+     * Return current catalog name.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @return name of the current catalog.
+     */
+    String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;
+
     /**
      * Return all available catalogs in the current session.
      *
@@ -198,4 +207,14 @@ public interface SqlGatewayService {
      * @return names of the registered catalogs.
      */
     Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
+
+    /**
+     * Return all available schemas in the given catalog.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param catalogName name string of the given catalog.
+     * @return names of the registered schemas.
+     */
+    Set<String> listDatabases(SessionHandle sessionHandle, String catalogName)
+            throws SqlGatewayException;
 }
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
index 6177008b264..9366b6e58fb 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
@@ -29,6 +29,9 @@ public enum OperationType {
     /** The type indicates the operation list catalogs. */
     LIST_CATALOGS,
 
+    /** The type indicates the operation list schemas. */
+    LIST_SCHEMAS,
+
     /** The type indicates the operation is unknown. */
     UNKNOWN;
 }
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index af0ff36ec6e..e07debec55b 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -120,4 +120,15 @@ public class MockedSqlGatewayService implements SqlGatewayService {
     public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 9419b6071c3..39f437914b8 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -221,6 +221,21 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
         }
     }
 
+    @Override
+    public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) {
+        try {
+            return getSession(sessionHandle).createExecutor().listDatabases(catalogName);
+        } catch (Throwable t) {
+            LOG.error("Failed to listDatabases.", t);
+            throw new SqlGatewayException("Failed to listDatabases.", t);
+        }
+    }
+
+    @Override
+    public String getCurrentCatalog(SessionHandle sessionHandle) {
+        return getSession(sessionHandle).createExecutor().getCurrentCatalog();
+    }
+
     @VisibleForTesting
     Session getSession(SessionHandle sessionHandle) {
         return sessionManager.getSession(sessionHandle);
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index e49a91f2ae8..a42ed341f24 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.CatalogNotExistException;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableResultInternal;
@@ -43,6 +44,7 @@ import org.apache.flink.table.operations.command.SetOperation;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -100,10 +102,28 @@ public class OperationExecutor {
         }
     }
 
+    public String getCurrentCatalog() {
+        return getTableEnvironment().getCatalogManager().getCurrentCatalog();
+    }
+
     public Set<String> listCatalogs() {
         return getTableEnvironment().getCatalogManager().listCatalogs();
     }
 
+    public Set<String> listDatabases(String catalogName) {
+        return new HashSet<>(
+                getTableEnvironment()
+                        .getCatalogManager()
+                        .getCatalog(catalogName)
+                        .orElseThrow(
+                                () ->
+                                        new CatalogNotExistException(
+                                                String.format(
+                                                        "Catalog '%s' does not exist.",
+                                                        catalogName)))
+                        .listDatabases());
+    }
+
     // --------------------------------------------------------------------------------------------
 
     @VisibleForTesting
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 69880a74dec..3481ac930c2 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -304,6 +304,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Catalog API tests
+    // --------------------------------------------------------------------------------------------
+
     @Test
     public void testListCatalogs() {
         SessionEnvironment environment =
@@ -316,6 +320,46 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         assertThat(service.listCatalogs(sessionHandle)).contains("cat1", "cat2");
     }
 
+    @Test
+    public void testListDatabases() throws Exception {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .registerCatalog("cat", new GenericInMemoryCatalog("cat"))
+                        .setDefaultCatalog("cat")
+                        .build();
+        SessionHandle sessionHandle = service.openSession(environment);
+        Configuration configuration =
+                Configuration.fromMap(service.getSessionConfig(sessionHandle));
+
+        service.executeStatement(sessionHandle, "CREATE DATABASE db1", -1, configuration);
+        OperationHandle operationHandle =
+                service.executeStatement(sessionHandle, "CREATE DATABASE db2", -1, configuration);
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        service.getOperationInfo(sessionHandle, operationHandle)
+                                .getStatus()
+                                .isTerminalStatus(),
+                Duration.ofSeconds(100),
+                "Failed to wait operation finish.");
+        assertThat(service.listDatabases(sessionHandle, "cat")).contains("db1", "db2");
+    }
+
+    @Test
+    public void testGetCurrentCatalog() {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .registerCatalog("cat1", new GenericInMemoryCatalog("cat1"))
+                        .registerCatalog("cat2", new GenericInMemoryCatalog("cat2"))
+                        .setDefaultCatalog("cat2")
+                        .build();
+        SessionHandle sessionHandle = service.openSession(environment);
+
+        assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2");
+    }
+
     // --------------------------------------------------------------------------------------------
     // Concurrent tests
     // --------------------------------------------------------------------------------------------