You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/05 14:32:21 UTC
[flink] branch master updated: [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 82417ab9a5a [FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint
82417ab9a5a is described below
commit 82417ab9a5ad6935a73f9b7d1bc6bd4f47ce4f61
Author: yuzelin <74...@qq.com>
AuthorDate: Thu Aug 4 21:08:14 2022 +0800
[FLINK-28630][sql-gateway][hive] Allow to GetSchemas in the HiveServer2 Endpoint
This closes #20401
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 38 ++++--
.../table/endpoint/hive/HiveServer2Schemas.java | 12 ++
.../hive/util/OperationExecutorFactory.java | 143 +++++++++++++++++++++
.../endpoint/hive/util/StringRowDataUtils.java | 37 ------
.../hive/util/ThriftObjectConversions.java | 2 +
.../endpoint/hive/HiveServer2EndpointITCase.java | 107 +++++++++++++--
.../flink/table/gateway/api/SqlGatewayService.java | 21 ++-
.../table/gateway/api/operation/OperationType.java | 3 +
.../gateway/api/utils/MockedSqlGatewayService.java | 11 ++
.../gateway/service/SqlGatewayServiceImpl.java | 15 +++
.../service/operation/OperationExecutor.java | 20 +++
.../gateway/service/SqlGatewayServiceITCase.java | 44 +++++++
12 files changed, 391 insertions(+), 62 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 0772a4e0b24..f27168a4eb1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.endpoint.hive.util.StringRowDataUtils;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
@@ -113,7 +112,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -121,9 +119,10 @@ import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
-import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
+import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
@@ -133,7 +132,6 @@ import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema;
-import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -401,14 +399,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
service.submitOperation(
sessionHandle,
OperationType.LIST_CATALOGS,
- () -> {
- Set<String> catalogNames = service.listCatalogs(sessionHandle);
- return new ResultSet(
- EOS,
- null,
- GET_CATALOGS_SCHEMA,
- StringRowDataUtils.toRowData(catalogNames));
- });
+ createGetCatalogsExecutor(service, sessionHandle));
resp.setStatus(OK_STATUS);
resp.setOperationHandle(
toTOperationHandle(
@@ -422,7 +413,28 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetSchemasResp resp = new TGetSchemasResp();
+ try {
+ SessionHandle sessionHandle = toSessionHandle(tGetSchemasReq.getSessionHandle());
+ String catalogName = tGetSchemasReq.getCatalogName();
+ OperationHandle operationHandle =
+ service.submitOperation(
+ sessionHandle,
+ OperationType.LIST_SCHEMAS,
+ createGetSchemasExecutor(
+ service,
+ sessionHandle,
+ catalogName,
+ tGetSchemasReq.getSchemaName()));
+
+ resp.setStatus(OK_STATUS);
+ resp.setOperationHandle(
+ toTOperationHandle(sessionHandle, operationHandle, OperationType.LIST_SCHEMAS));
+ } catch (Throwable t) {
+ LOG.error("Failed to GetSchemas.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index 0a6818330fd..e3df486f3e0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
+import java.util.Arrays;
import java.util.Collections;
/** Schemas for the HiveServer2 Endpoint result. */
@@ -35,4 +36,15 @@ public class HiveServer2Schemas {
.withComment("Catalog name. NULL if not applicable.")),
Collections.emptyList(),
null);
+
+ /** Schema for {@link HiveServer2Endpoint#GetSchemas}. */
+ public static final ResolvedSchema GET_SCHEMAS_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("TABLE_SCHEMA", DataTypes.STRING())
+ .withComment("Schema name. NULL if not applicable."),
+ Column.physical("TABLE_CAT", DataTypes.STRING())
+ .withComment("Catalog name. NULL if not applicable")),
+ Collections.emptyList(),
+ null);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
new file mode 100644
index 00000000000..8e90c0d6972
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive.util;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
+import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+
+/** Factory to create the operation executor. */
+public class OperationExecutorFactory {
+
+ public static Callable<ResultSet> createGetCatalogsExecutor(
+ SqlGatewayService service, SessionHandle sessionHandle) {
+ return () -> executeGetCatalogs(service, sessionHandle);
+ }
+
+ public static Callable<ResultSet> createGetSchemasExecutor(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName) {
+ return () -> executeGetSchemas(service, sessionHandle, catalogName, schemaName);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Executors
+ // --------------------------------------------------------------------------------------------
+
+ private static ResultSet executeGetCatalogs(
+ SqlGatewayService service, SessionHandle sessionHandle) {
+ Set<String> catalogNames = service.listCatalogs(sessionHandle);
+ return new ResultSet(
+ EOS,
+ null,
+ GET_CATALOGS_SCHEMA,
+ catalogNames.stream()
+ .map(OperationExecutorFactory::wrap)
+ .collect(Collectors.toList()));
+ }
+
+ private static ResultSet executeGetSchemas(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName) {
+ String specifiedCatalogName =
+ catalogName == null || catalogName.equals("")
+ ? service.getCurrentCatalog(sessionHandle)
+ : catalogName;
+ Set<String> databaseNames =
+ filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName);
+ return new ResultSet(
+ EOS,
+ null,
+ GET_SCHEMAS_SCHEMA,
+ databaseNames.stream()
+ .map(name -> wrap(name, specifiedCatalogName))
+ .collect(Collectors.toList()));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ private static Set<String> filter(Set<String> candidates, String pattern) {
+ Pattern compiledPattern = convertNamePattern(pattern);
+ return candidates.stream()
+ .filter(name -> compiledPattern.matcher(name).matches())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Covert SQL 'like' pattern to a Java regular expression. Underscores (_) are converted to '.'
+ * and percent signs (%) are converted to '.*'. Note: escape characters are removed.
+ *
+ * @param pattern the SQL pattern to convert.
+ * @return the equivalent Java regular expression of the pattern.
+ */
+ private static Pattern convertNamePattern(@Nullable String pattern) {
+ if ((pattern == null) || pattern.isEmpty()) {
+ pattern = "%";
+ }
+ String wStr = ".*";
+ return Pattern.compile(
+ pattern.replaceAll("([^\\\\])%", "$1" + wStr)
+ .replaceAll("\\\\%", "%")
+ .replaceAll("^%", wStr)
+ .replaceAll("([^\\\\])_", "$1.")
+ .replaceAll("\\\\_", "_")
+ .replaceAll("^_", "."));
+ }
+
+ private static GenericRowData wrap(Object... elements) {
+ Object[] pack = new Object[elements.length];
+ for (int i = 0; i < elements.length; i++) {
+ Object element = elements[i];
+ if (element != null) {
+ if (element instanceof String) {
+ pack[i] = StringData.fromString((String) element);
+ } else if (element instanceof Integer) {
+ pack[i] = element;
+ } else if (element instanceof Short) {
+ pack[i] = element;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Can not wrap the element %s at index %s into RowData.",
+ element, i));
+ }
+ }
+ }
+ return GenericRowData.of(pack);
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
deleted file mode 100644
index 103f7311c89..00000000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/StringRowDataUtils.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.endpoint.hive.util;
-
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** Convert the {@link String} to {@link RowData}. */
-public class StringRowDataUtils {
-
- public static List<RowData> toRowData(Collection<String> external) {
- return external.stream()
- .map(val -> GenericRowData.of(StringData.fromString(val)))
- .collect(Collectors.toList());
- }
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 235796e4eb3..9e2b1cc73ed 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -162,6 +162,8 @@ public class ThriftObjectConversions {
return TOperationType.EXECUTE_STATEMENT;
case LIST_CATALOGS:
return TOperationType.GET_CATALOGS;
+ case LIST_SCHEMAS:
+ return TOperationType.GET_SCHEMAS;
case UNKNOWN:
return TOperationType.UNKNOWN;
default:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 7b567e184c0..94d392b63c5 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
@@ -63,9 +64,13 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.net.InetAddress;
import java.sql.Connection;
import java.sql.ResultSetMetaData;
+import java.sql.Statement;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -196,18 +201,85 @@ public class HiveServer2EndpointITCase extends TestLogger {
@Test
public void testGetCatalogs() throws Exception {
- try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
- java.sql.ResultSet result = connection.getMetaData().getCatalogs();
- assertSchemaEquals(
- ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
- result.getMetaData());
-
- List<String> actual = new ArrayList<>();
- while (result.next()) {
- actual.add(result.getString(1));
- }
+ runGetObjectTest(
+ connection -> connection.getMetaData().getCatalogs(),
+ ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
+ Arrays.asList(
+ Collections.singletonList("hive"),
+ Collections.singletonList("default_catalog")));
+ }
+
+ @Test
+ public void testGetSchemas() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getSchemas("default_catalog", null),
+ ResolvedSchema.of(
+ Column.physical("TABLE_SCHEMA", DataTypes.STRING()),
+ Column.physical("TABLE_CAT", DataTypes.STRING())),
+ Arrays.asList(
+ Arrays.asList("default_database", "default_catalog"),
+ Arrays.asList("db_test1", "default_catalog"),
+ Arrays.asList("db_test2", "default_catalog"),
+ Arrays.asList("db_diff", "default_catalog")));
+ }
+
+ @Test
+ public void testGetSchemasWithPattern() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getSchemas(null, "db\\_test%"),
+ ResolvedSchema.of(
+ Column.physical("TABLE_SCHEMA", DataTypes.STRING()),
+ Column.physical("TABLE_CAT", DataTypes.STRING())),
+ Arrays.asList(
+ Arrays.asList("db_test1", "default_catalog"),
+ Arrays.asList("db_test2", "default_catalog")));
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private Connection getInitializedConnection() throws Exception {
+ Connection connection = ENDPOINT_EXTENSION.getConnection();
+ Statement statement = connection.createStatement();
+ statement.execute("SET table.sql-dialect=default");
+ statement.execute("USE CATALOG `default_catalog`");
+
+ // default_catalog: db_test1 | db_test2 | db_diff | default
+ // db_test1: temporary table tb_1, table tb_2, temporary view tb_3, view tb_4
+ // db_test2: table tb_1, table diff_1, view tb_2, view diff_2
+ // db_diff: table tb_1, view tb_2
+
+ statement.execute("CREATE DATABASE db_test1");
+ statement.execute("CREATE DATABASE db_test2");
+ statement.execute("CREATE DATABASE db_diff");
- assertThat(actual).contains("hive", "default_catalog");
+ statement.execute("CREATE TEMPORARY TABLE db_test1.tb_1 COMMENT 'temporary table tb_1'");
+ statement.execute("CREATE TABLE db_test1.tb_2 COMMENT 'table tb_2'");
+ statement.execute(
+ "CREATE TEMPORARY VIEW db_test1.tb_3 COMMENT 'temporary view tb_3' AS SELECT 1");
+ statement.execute("CREATE VIEW db_test1.tb_4 COMMENT 'view tb_4' AS SELECT 1");
+
+ statement.execute("CREATE TABLE db_test2.tb_1 COMMENT 'table tb_1'");
+ statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table diff_1'");
+ statement.execute("CREATE VIEW db_test2.tb_2 COMMENT 'view tb_2' AS SELECT 1");
+ statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view diff_2' AS SELECT 1");
+
+ statement.execute("CREATE TABLE db_diff.tb_1 COMMENT 'table tb_1'");
+ statement.execute("CREATE VIEW db_diff.tb_2 COMMENT 'view tb_2' AS SELECT 1");
+
+ statement.close();
+ return connection;
+ }
+
+ private void runGetObjectTest(
+ FunctionWithException<Connection, java.sql.ResultSet, Exception> resultSetSupplier,
+ ResolvedSchema expectedSchema,
+ List<List<Object>> expectedResults)
+ throws Exception {
+ try (Connection connection = getInitializedConnection();
+ java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
+ assertSchemaEquals(expectedSchema, result.getMetaData());
+ assertThat(new HashSet<>(collect(result, expectedSchema.getColumnCount())))
+ .isEqualTo(new HashSet<>(expectedResults));
}
}
@@ -265,4 +337,17 @@ public class HiveServer2EndpointITCase extends TestLogger {
assertThat(metaData.getColumnType(i)).isEqualTo(jdbcType);
}
}
+
+ private List<List<Object>> collect(java.sql.ResultSet result, int columnCount)
+ throws Exception {
+ List<List<Object>> actual = new ArrayList<>();
+ while (result.next()) {
+ List<Object> row = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ row.add(result.getObject(i));
+ }
+ actual.add(row);
+ }
+ return actual;
+ }
}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 97b377c59c4..5e77577fc02 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -185,12 +185,21 @@ public interface SqlGatewayService {
SessionHandle sessionHandle,
OperationHandle operationHandle,
FetchOrientation orientation,
- int maxRows);
+ int maxRows)
+ throws SqlGatewayException;
// -------------------------------------------------------------------------------------------
// Catalog API
// -------------------------------------------------------------------------------------------
+ /**
+ * Return current catalog name.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @return name of the current catalog.
+ */
+ String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;
+
/**
* Return all available catalogs in the current session.
*
@@ -198,4 +207,14 @@ public interface SqlGatewayService {
* @return names of the registered catalogs.
*/
Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
+
+ /**
+ * Return all available schemas in the given catalog.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param catalogName name string of the given catalog.
+ * @return names of the registered schemas.
+ */
+ Set<String> listDatabases(SessionHandle sessionHandle, String catalogName)
+ throws SqlGatewayException;
}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
index 6177008b264..9366b6e58fb 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
@@ -29,6 +29,9 @@ public enum OperationType {
/** The type indicates the operation list catalogs. */
LIST_CATALOGS,
+ /** The type indicates the operation list schemas. */
+ LIST_SCHEMAS,
+
/** The type indicates the operation is unknown. */
UNKNOWN;
}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index af0ff36ec6e..e07debec55b 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -120,4 +120,15 @@ public class MockedSqlGatewayService implements SqlGatewayService {
public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 9419b6071c3..39f437914b8 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -221,6 +221,21 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
}
}
+ @Override
+ public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) {
+ try {
+ return getSession(sessionHandle).createExecutor().listDatabases(catalogName);
+ } catch (Throwable t) {
+ LOG.error("Failed to listDatabases.", t);
+ throw new SqlGatewayException("Failed to listDatabases.", t);
+ }
+ }
+
+ @Override
+ public String getCurrentCatalog(SessionHandle sessionHandle) {
+ return getSession(sessionHandle).createExecutor().getCurrentCatalog();
+ }
+
@VisibleForTesting
Session getSession(SessionHandle sessionHandle) {
return sessionManager.getSession(sessionHandle);
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index e49a91f2ae8..a42ed341f24 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
@@ -43,6 +44,7 @@ import org.apache.flink.table.operations.command.SetOperation;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -100,10 +102,28 @@ public class OperationExecutor {
}
}
+ public String getCurrentCatalog() {
+ return getTableEnvironment().getCatalogManager().getCurrentCatalog();
+ }
+
public Set<String> listCatalogs() {
return getTableEnvironment().getCatalogManager().listCatalogs();
}
+ public Set<String> listDatabases(String catalogName) {
+ return new HashSet<>(
+ getTableEnvironment()
+ .getCatalogManager()
+ .getCatalog(catalogName)
+ .orElseThrow(
+ () ->
+ new CatalogNotExistException(
+ String.format(
+ "Catalog '%s' does not exist.",
+ catalogName)))
+ .listDatabases());
+ }
+
// --------------------------------------------------------------------------------------------
@VisibleForTesting
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 69880a74dec..3481ac930c2 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -304,6 +304,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
}
+ // --------------------------------------------------------------------------------------------
+ // Catalog API tests
+ // --------------------------------------------------------------------------------------------
+
@Test
public void testListCatalogs() {
SessionEnvironment environment =
@@ -316,6 +320,46 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
assertThat(service.listCatalogs(sessionHandle)).contains("cat1", "cat2");
}
+ @Test
+ public void testListDatabases() throws Exception {
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .registerCatalog("cat", new GenericInMemoryCatalog("cat"))
+ .setDefaultCatalog("cat")
+ .build();
+ SessionHandle sessionHandle = service.openSession(environment);
+ Configuration configuration =
+ Configuration.fromMap(service.getSessionConfig(sessionHandle));
+
+ service.executeStatement(sessionHandle, "CREATE DATABASE db1", -1, configuration);
+ OperationHandle operationHandle =
+ service.executeStatement(sessionHandle, "CREATE DATABASE db2", -1, configuration);
+
+ CommonTestUtils.waitUtil(
+ () ->
+ service.getOperationInfo(sessionHandle, operationHandle)
+ .getStatus()
+ .isTerminalStatus(),
+ Duration.ofSeconds(100),
+ "Failed to wait operation finish.");
+ assertThat(service.listDatabases(sessionHandle, "cat")).contains("db1", "db2");
+ }
+
+ @Test
+ public void testGetCurrentCatalog() {
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .registerCatalog("cat1", new GenericInMemoryCatalog("cat1"))
+ .registerCatalog("cat2", new GenericInMemoryCatalog("cat2"))
+ .setDefaultCatalog("cat2")
+ .build();
+ SessionHandle sessionHandle = service.openSession(environment);
+
+ assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2");
+ }
+
// --------------------------------------------------------------------------------------------
// Concurrent tests
// --------------------------------------------------------------------------------------------