You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/09/08 06:27:47 UTC
[flink] 01/02: [FLINK-29118][sql-gateway][hive] Remove default GenericInMemoryCatalog in the HiveServer2 Endpoint when openSession
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 833e7ffbb5f075d2014dfaec547b6987d59bc89f
Author: yuzelin <74...@qq.com>
AuthorDate: Wed Aug 31 13:42:03 2022 +0800
[FLINK-29118][sql-gateway][hive] Remove default GenericInMemoryCatalog in the HiveServer2 Endpoint when openSession
This closes #20714
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 4 +-
.../endpoint/hive/HiveServer2EndpointITCase.java | 119 ++++++++++-----------
.../hive/HiveServer2EndpointStatementITCase.java | 4 +
.../gateway/api/session/SessionEnvironment.java | 16 ---
.../api/session/SessionEnvironmentTest.java | 2 -
.../gateway/service/context/SessionContext.java | 118 +++++++++++++-------
.../gateway/service/session/SessionManager.java | 12 +--
.../gateway/service/SqlGatewayServiceITCase.java | 7 +-
.../service/context/SessionContextTest.java | 12 ++-
9 files changed, 148 insertions(+), 146 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 55bd8b21987..12ffa946b4f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -311,7 +311,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
Catalog hiveCatalog =
new HiveCatalog(
catalogName,
- defaultDatabase,
+ getUsedDefaultDatabase(originSessionConf).orElse(defaultDatabase),
conf,
HiveShimLoader.getHiveVersion(),
allowEmbedded);
@@ -323,8 +323,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
.registerCatalog(catalogName, hiveCatalog)
.registerModuleAtHead(moduleName, hiveModule)
.setDefaultCatalog(catalogName)
- .setDefaultDatabase(
- getUsedDefaultDatabase(originSessionConf).orElse(null))
.addSessionConfig(sessionConfig)
.build());
// response
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 2bcaa020fcd..19e3074ed65 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -61,6 +61,7 @@ import org.apache.hive.service.rpc.thrift.TOperationType;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -108,6 +109,11 @@ public class HiveServer2EndpointITCase extends TestLogger {
public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+ @BeforeAll
+ public static void setup() throws Exception {
+ initializeEnvironment();
+ }
+
@Test
public void testOpenCloseJdbcConnection() throws Exception {
SessionManager sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
@@ -256,21 +262,19 @@ public class HiveServer2EndpointITCase extends TestLogger {
runGetObjectTest(
connection -> connection.getMetaData().getCatalogs(),
ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
- Arrays.asList(
- Collections.singletonList("default_catalog"),
- Collections.singletonList("hive")));
+ Collections.singletonList(Collections.singletonList("hive")));
}
@Test
public void testGetSchemas() throws Exception {
runGetObjectTest(
- connection -> connection.getMetaData().getSchemas("default_catalog", null),
+ connection -> connection.getMetaData().getSchemas("hive", null),
getExpectedGetSchemasOperationSchema(),
Arrays.asList(
- Arrays.asList("db_diff", "default_catalog"),
- Arrays.asList("db_test1", "default_catalog"),
- Arrays.asList("db_test2", "default_catalog"),
- Arrays.asList("default_database", "default_catalog")));
+ Arrays.asList("db_diff", "hive"),
+ Arrays.asList("db_test1", "hive"),
+ Arrays.asList("db_test2", "hive"),
+ Arrays.asList("default", "hive")));
}
@Test
@@ -279,8 +283,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
connection -> connection.getMetaData().getSchemas(null, "db\\_test%"),
getExpectedGetSchemasOperationSchema(),
Arrays.asList(
- Arrays.asList("db_test1", "default_catalog"),
- Arrays.asList("db_test2", "default_catalog")));
+ Arrays.asList("db_test1", "hive"), Arrays.asList("db_test2", "hive")));
}
@Test
@@ -296,16 +299,16 @@ public class HiveServer2EndpointITCase extends TestLogger {
new String[] {"MANAGED_TABLE", "VIRTUAL_VIEW"}),
getExpectedGetTablesOperationSchema(),
Arrays.asList(
- Arrays.asList("default_catalog", "db_diff", "tbl_1", "TABLE"),
- Arrays.asList("default_catalog", "db_test1", "tbl_1", "TABLE"),
- Arrays.asList("default_catalog", "db_test1", "tbl_2", "TABLE"),
- Arrays.asList("default_catalog", "db_test2", "diff_1", "TABLE"),
- Arrays.asList("default_catalog", "db_test2", "tbl_1", "TABLE"),
- Arrays.asList("default_catalog", "db_diff", "tbl_2", "VIEW"),
- Arrays.asList("default_catalog", "db_test1", "tbl_3", "VIEW"),
- Arrays.asList("default_catalog", "db_test1", "tbl_4", "VIEW"),
- Arrays.asList("default_catalog", "db_test2", "diff_2", "VIEW"),
- Arrays.asList("default_catalog", "db_test2", "tbl_2", "VIEW")));
+ Arrays.asList("hive", "db_diff", "tbl_1", "TABLE"),
+ Arrays.asList("hive", "db_test1", "tbl_1", "TABLE"),
+ Arrays.asList("hive", "db_test1", "tbl_2", "TABLE"),
+ Arrays.asList("hive", "db_test2", "diff_1", "TABLE"),
+ Arrays.asList("hive", "db_test2", "tbl_1", "TABLE"),
+ Arrays.asList("hive", "db_diff", "tbl_2", "VIEW"),
+ Arrays.asList("hive", "db_test1", "tbl_3", "VIEW"),
+ Arrays.asList("hive", "db_test1", "tbl_4", "VIEW"),
+ Arrays.asList("hive", "db_test2", "diff_2", "VIEW"),
+ Arrays.asList("hive", "db_test2", "tbl_2", "VIEW")));
}
@Test
@@ -315,15 +318,15 @@ public class HiveServer2EndpointITCase extends TestLogger {
connection
.getMetaData()
.getTables(
- "default_catalog",
+ "hive",
"db\\_test_",
"tbl%",
new String[] {"VIRTUAL_VIEW"}),
getExpectedGetTablesOperationSchema(),
Arrays.asList(
- Arrays.asList("default_catalog", "db_test1", "tbl_3", "VIEW"),
- Arrays.asList("default_catalog", "db_test1", "tbl_4", "VIEW"),
- Arrays.asList("default_catalog", "db_test2", "tbl_2", "VIEW")));
+ Arrays.asList("hive", "db_test1", "tbl_3", "VIEW"),
+ Arrays.asList("hive", "db_test1", "tbl_4", "VIEW"),
+ Arrays.asList("hive", "db_test2", "tbl_2", "VIEW")));
}
@Test
@@ -356,67 +359,64 @@ public class HiveServer2EndpointITCase extends TestLogger {
.isEqualTo(
Arrays.asList(
Arrays.asList(
- "default_catalog",
- "db_diff",
- "tbl_2",
- "EXPR$0",
+ "hive", "db_diff", "tbl_2", "EXPR$0",
"INT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_1",
"user",
"BIGINT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_1",
"product",
"STRING"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_1",
"amount",
"INT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_2",
"user",
"STRING"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_2",
"id",
"BIGINT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_2",
"timestamp",
"TIMESTAMP"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_3",
"EXPR$0",
"INT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_4",
"EXPR$0",
"INT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test2",
"diff_2",
"EXPR$0",
"INT"),
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test2",
"tbl_2",
"EXPR$0",
@@ -429,11 +429,11 @@ public class HiveServer2EndpointITCase extends TestLogger {
connection ->
connection
.getMetaData()
- .getColumns("default_catalog", "db\\_test_", "tbl\\_1", "user"),
+ .getColumns("hive", "db\\_test_", "tbl\\_1", "user"),
getExpectedGetColumnsOperationSchema(),
Collections.singletonList(
Arrays.asList(
- "default_catalog",
+ "hive",
"db_test1",
"tbl_1",
"user",
@@ -454,9 +454,9 @@ public class HiveServer2EndpointITCase extends TestLogger {
connection -> connection.getMetaData().getPrimaryKeys(null, null, null),
getExpectedGetPrimaryKeysOperationSchema(),
Arrays.asList(
- Arrays.asList("default_catalog", "db_test1", "tbl_1", "user", 1, "pk"),
- Arrays.asList("default_catalog", "db_test1", "tbl_2", "user", 1, "pk"),
- Arrays.asList("default_catalog", "db_test1", "tbl_2", "id", 2, "pk")));
+ Arrays.asList("hive", "db_test1", "tbl_1", "user", 1, "pk"),
+ Arrays.asList("hive", "db_test1", "tbl_2", "user", 1, "pk"),
+ Arrays.asList("hive", "db_test1", "tbl_2", "id", 2, "pk")));
}
@Test
@@ -465,8 +465,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
connection -> connection.getMetaData().getPrimaryKeys(null, null, "tbl_2"),
getExpectedGetPrimaryKeysOperationSchema(),
Arrays.asList(
- Arrays.asList("default_catalog", "db_test1", "tbl_2", "user", 1, "pk"),
- Arrays.asList("default_catalog", "db_test1", "tbl_2", "id", 2, "pk")));
+ Arrays.asList("hive", "db_test1", "tbl_2", "user", 1, "pk"),
+ Arrays.asList("hive", "db_test1", "tbl_2", "id", 2, "pk")));
}
@Test
@@ -541,14 +541,14 @@ public class HiveServer2EndpointITCase extends TestLogger {
try (Statement statement = connection.createStatement()) {
statement.execute(
String.format(
- "CREATE FUNCTION `default_catalog`.`db_test2`.`my_abs` as '%s'",
+ "CREATE FUNCTION `hive`.`db_test2`.`my_abs` as '%s'",
JavaFunc0.class.getName()));
statement.execute(
String.format(
- "CREATE FUNCTION `default_catalog`.`db_diff`.`your_abs` as '%s'",
+ "CREATE FUNCTION `hive`.`db_diff`.`your_abs` as '%s'",
JavaFunc0.class.getName()));
}
- return connection.getMetaData().getFunctions("default_catalog", "db.*", "my.*");
+ return connection.getMetaData().getFunctions("hive", "db.*", "my.*");
},
ResolvedSchema.of(
Column.physical("FUNCTION_CAT", DataTypes.STRING()),
@@ -559,12 +559,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
Column.physical("SPECIFIC_NAME", DataTypes.STRING())),
Collections.singletonList(
Arrays.asList(
- "default_catalog",
- "db_test2",
- "my_abs",
- "",
- 0,
- JavaFunc0.class.getName())));
+ "hive", "db_test2", "my_abs", "", 0, JavaFunc0.class.getName())));
}
@Test
@@ -579,13 +574,12 @@ public class HiveServer2EndpointITCase extends TestLogger {
// --------------------------------------------------------------------------------------------
- private Connection getInitializedConnection() throws Exception {
- Connection connection = ENDPOINT_EXTENSION.getConnection();
- try (Statement statement = connection.createStatement()) {
+ private static void initializeEnvironment() throws Exception {
+ try (Connection connection = ENDPOINT_EXTENSION.getConnection();
+ Statement statement = connection.createStatement()) {
statement.execute("SET table.sql-dialect=default");
- statement.execute("USE CATALOG `default_catalog`");
- // default_catalog: db_test1 | db_test2 | db_diff | default
+ // hive: db_test1 | db_test2 | db_diff | default
// db_test1: temporary table tbl_1, table tbl_2, temporary view tbl_3, view tbl_4
// db_test2: table tbl_1, table diff_1, view tbl_2, view diff_2
// db_diff: table tbl_1, view tbl_2
@@ -595,7 +589,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
statement.execute("CREATE DATABASE db_diff");
statement.execute(
- "CREATE TEMPORARY TABLE db_test1.tbl_1(\n"
+ "CREATE TABLE db_test1.tbl_1(\n"
+ "`user` BIGINT CONSTRAINT `pk` PRIMARY KEY COMMENT 'user id.',\n"
+ "`product` STRING NOT NULL,\n"
+ "`amount` INT) COMMENT 'temporary table tbl_1'");
@@ -606,7 +600,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
+ "`timestamp` TIMESTAMP,"
+ "CONSTRAINT `pk` PRIMARY KEY(`user`, `id`) NOT ENFORCED) COMMENT 'table tbl_2'");
statement.execute(
- "CREATE TEMPORARY VIEW db_test1.tbl_3 COMMENT 'temporary view tbl_3' AS SELECT 1");
+ "CREATE VIEW db_test1.tbl_3 COMMENT 'temporary view tbl_3' AS SELECT 1");
statement.execute("CREATE VIEW db_test1.tbl_4 COMMENT 'view tbl_4' AS SELECT 1");
statement.execute("CREATE TABLE db_test2.tbl_1 COMMENT 'table tbl_1'");
@@ -617,7 +611,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
statement.execute("CREATE TABLE db_diff.tbl_1 COMMENT 'table tbl_1'");
statement.execute("CREATE VIEW db_diff.tbl_2 COMMENT 'view tbl_2' AS SELECT 1");
}
- return connection;
}
private void runGetObjectTest(
@@ -636,7 +629,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
ResolvedSchema expectedSchema,
Consumer<List<List<Object>>> validator)
throws Exception {
- try (Connection connection = getInitializedConnection();
+ try (Connection connection = ENDPOINT_EXTENSION.getConnection();
java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
assertSchemaEquals(expectedSchema, result.getMetaData());
validator.accept(collectAndCompact(result, expectedSchema.getColumnCount()));
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
index 34abff6fc07..09127b995ef 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
@@ -155,6 +155,10 @@ public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatem
for (String sql :
Arrays.asList(
"RESET",
+ "CREATE CATALOG `default_catalog` \n"
+ + "WITH (\n"
+ + "'type' = 'generic_in_memory',\n"
+ + "'default-database' = 'default_database')",
"USE CATALOG `default_catalog`",
"DROP CATALOG hive",
"UNLOAD MODULE hive")) {
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
index b6372583e39..8be8d265442 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
@@ -43,7 +43,6 @@ public class SessionEnvironment {
private final Map<String, Catalog> registeredCatalogs;
private final Map<String, Module> registeredModules;
private final @Nullable String defaultCatalog;
- private final @Nullable String defaultDatabase;
private final Map<String, String> sessionConfig;
@VisibleForTesting
@@ -53,14 +52,12 @@ public class SessionEnvironment {
Map<String, Catalog> registeredCatalogs,
Map<String, Module> registeredModules,
@Nullable String defaultCatalog,
- @Nullable String defaultDatabase,
Map<String, String> sessionConfig) {
this.sessionName = sessionName;
this.version = version;
this.registeredCatalogs = registeredCatalogs;
this.registeredModules = registeredModules;
this.defaultCatalog = defaultCatalog;
- this.defaultDatabase = defaultDatabase;
this.sessionConfig = sessionConfig;
}
@@ -92,10 +89,6 @@ public class SessionEnvironment {
return Optional.ofNullable(defaultCatalog);
}
- public Optional<String> getDefaultDatabase() {
- return Optional.ofNullable(defaultDatabase);
- }
-
// -------------------------------------------------------------------------------------------
@Override
@@ -112,7 +105,6 @@ public class SessionEnvironment {
&& Objects.equals(registeredCatalogs, that.registeredCatalogs)
&& Objects.equals(registeredModules, that.registeredModules)
&& Objects.equals(defaultCatalog, that.defaultCatalog)
- && Objects.equals(defaultDatabase, that.defaultDatabase)
&& Objects.equals(sessionConfig, that.sessionConfig);
}
@@ -124,7 +116,6 @@ public class SessionEnvironment {
registeredCatalogs,
registeredModules,
defaultCatalog,
- defaultDatabase,
sessionConfig);
}
@@ -145,7 +136,6 @@ public class SessionEnvironment {
private final Map<String, Catalog> registeredCatalogs = new HashMap<>();
private final Map<String, Module> registeredModules = new HashMap<>();
private @Nullable String defaultCatalog;
- private @Nullable String defaultDatabase;
public Builder setSessionName(String sessionName) {
this.sessionName = sessionName;
@@ -167,11 +157,6 @@ public class SessionEnvironment {
return this;
}
- public Builder setDefaultDatabase(@Nullable String defaultDatabase) {
- this.defaultDatabase = defaultDatabase;
- return this;
- }
-
public Builder registerCatalog(String catalogName, Catalog catalog) {
if (registeredCatalogs.containsKey(catalogName)) {
throw new ValidationException(
@@ -198,7 +183,6 @@ public class SessionEnvironment {
registeredCatalogs,
registeredModules,
defaultCatalog,
- defaultDatabase,
sessionConfig);
}
}
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
index ad08a05b86f..5d61ffaa1d7 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
@@ -45,7 +45,6 @@ public class SessionEnvironmentTest {
new HashMap<>(),
new HashMap<>(),
"default",
- "default_db",
configMap);
SessionEnvironment actualEnvironment =
@@ -54,7 +53,6 @@ public class SessionEnvironmentTest {
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.addSessionConfig(configMap)
.setDefaultCatalog("default")
- .setDefaultDatabase("default_db")
.build();
assertEquals(expectedEnvironment, actualEnvironment);
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index bee1248914e..35622c9b917 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -38,11 +38,12 @@ import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
-import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -159,26 +160,6 @@ public class SessionContext {
// Method to execute commands
// --------------------------------------------------------------------------------------------
- public void registerCatalog(String catalogName, Catalog catalog) {
- sessionState.catalogManager.registerCatalog(catalogName, catalog);
- }
-
- public void registerModuleAtHead(String moduleName, Module module) {
- Deque<String> moduleNames = new ArrayDeque<>(sessionState.moduleManager.listModules());
- moduleNames.addFirst(moduleName);
-
- sessionState.moduleManager.loadModule(moduleName, module);
- sessionState.moduleManager.useModules(moduleNames.toArray(new String[0]));
- }
-
- public void setCurrentCatalog(String catalog) {
- sessionState.catalogManager.setCurrentCatalog(catalog);
- }
-
- public void setCurrentDatabase(String database) {
- sessionState.catalogManager.setCurrentDatabase(database);
- }
-
public OperationExecutor createOperationExecutor(Configuration executionConfig) {
return new OperationExecutor(this, executionConfig);
}
@@ -207,17 +188,16 @@ public class SessionContext {
public static SessionContext create(
DefaultContext defaultContext,
SessionHandle sessionId,
- EndpointVersion endpointVersion,
- Configuration sessionConf,
+ SessionEnvironment environment,
ExecutorService operationExecutorService) {
// --------------------------------------------------------------------------------------------------------------
// Init config
// --------------------------------------------------------------------------------------------------------------
Configuration configuration = defaultContext.getFlinkConfig().clone();
- configuration.addAll(sessionConf);
+ configuration.addAll(Configuration.fromMap(environment.getSessionConfig()));
// every session configure the specific local resource download directory
- setResourceDownloadTmpDir(sessionConf, sessionId);
+ setResourceDownloadTmpDir(configuration, sessionId);
// --------------------------------------------------------------------------------------------------------------
// Init classloader
@@ -233,22 +213,10 @@ public class SessionContext {
final ResourceManager resourceManager = new ResourceManager(configuration, userClassLoader);
- final ModuleManager moduleManager = new ModuleManager();
-
- final EnvironmentSettings settings =
- EnvironmentSettings.newInstance().withConfiguration(configuration).build();
+ final ModuleManager moduleManager = buildModuleManager(environment);
- CatalogManager catalogManager =
- CatalogManager.newBuilder()
- // Currently, the classloader is only used by DataTypeFactory.
- .classLoader(userClassLoader)
- .config(configuration)
- .defaultCatalog(
- settings.getBuiltInCatalogName(),
- new GenericInMemoryCatalog(
- settings.getBuiltInCatalogName(),
- settings.getBuiltInDatabaseName()))
- .build();
+ final CatalogManager catalogManager =
+ buildCatalogManager(configuration, userClassLoader, environment);
final FunctionCatalog functionCatalog =
new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager);
@@ -258,7 +226,7 @@ public class SessionContext {
return new SessionContext(
defaultContext,
sessionId,
- endpointVersion,
+ environment.getSessionEndpointVersion(),
configuration,
userClassLoader,
sessionState,
@@ -363,6 +331,74 @@ public class SessionContext {
TableConfigOptions.RESOURCES_DOWNLOAD_DIR, path.toAbsolutePath().toString());
}
+ private static ModuleManager buildModuleManager(SessionEnvironment environment) {
+ final ModuleManager moduleManager = new ModuleManager();
+
+ environment
+ .getRegisteredModules()
+ .forEach(
+ (moduleName, module) -> {
+ Deque<String> moduleNames =
+ new ArrayDeque<>(moduleManager.listModules());
+ moduleNames.addFirst(moduleName);
+
+ moduleManager.loadModule(moduleName, module);
+ moduleManager.useModules(moduleNames.toArray(new String[0]));
+ });
+
+ return moduleManager;
+ }
+
+ private static CatalogManager buildCatalogManager(
+ Configuration configuration,
+ URLClassLoader userClassLoader,
+ SessionEnvironment environment) {
+ CatalogManager.Builder builder =
+ CatalogManager.newBuilder()
+ // Currently, the classloader is only used by DataTypeFactory.
+ .classLoader(userClassLoader)
+ .config(configuration);
+
+ // init default catalog
+ String defaultCatalogName;
+ Catalog defaultCatalog;
+ if (environment.getDefaultCatalog().isPresent()) {
+ defaultCatalogName = environment.getDefaultCatalog().get();
+ defaultCatalog = environment.getRegisteredCatalogs().get(defaultCatalogName);
+ } else {
+ EnvironmentSettings settings =
+ EnvironmentSettings.newInstance().withConfiguration(configuration).build();
+ defaultCatalogName = settings.getBuiltInCatalogName();
+
+ if (environment.getRegisteredCatalogs().containsKey(defaultCatalogName)) {
+ throw new SqlGatewayException(
+ String.format(
+ "The name of the registered catalog is conflicts with the built-in default catalog name: %s.",
+ defaultCatalogName));
+ }
+
+ defaultCatalog =
+ new GenericInMemoryCatalog(
+ defaultCatalogName, settings.getBuiltInDatabaseName());
+ }
+ defaultCatalog.open();
+
+ CatalogManager catalogManager =
+ builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();
+
+ // filter the default catalog out to avoid repeated registration
+ environment
+ .getRegisteredCatalogs()
+ .forEach(
+ (catalogName, catalog) -> {
+ if (!catalogName.equals(defaultCatalogName)) {
+ catalogManager.registerCatalog(catalogName, catalog);
+ }
+ });
+
+ return catalogManager;
+ }
+
// --------------------------------------------------------------------------------------------
// Inner class
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
index 1b3187b90c4..575ac64b6e3 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.gateway.service.session;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -149,16 +148,7 @@ public class SessionManager {
SessionContext sessionContext =
SessionContext.create(
- defaultContext,
- sessionId,
- environment.getSessionEndpointVersion(),
- Configuration.fromMap(environment.getSessionConfig()),
- operationExecutorService);
-
- environment.getRegisteredCatalogs().forEach(sessionContext::registerCatalog);
- environment.getRegisteredModules().forEach(sessionContext::registerModuleAtHead);
- environment.getDefaultCatalog().ifPresent(sessionContext::setCurrentCatalog);
- environment.getDefaultDatabase().ifPresent(sessionContext::setCurrentDatabase);
+ defaultContext, sessionId, environment, operationExecutorService);
session = new Session(sessionContext);
sessions.put(sessionId, session);
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 685bd04cb30..c8c8513bdb6 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -138,16 +137,14 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
String catalogName = "default";
String databaseName = "testDb";
String moduleName = "testModule";
- GenericInMemoryCatalog defaultCatalog = new GenericInMemoryCatalog(catalogName);
- defaultCatalog.createDatabase(
- databaseName, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
+ GenericInMemoryCatalog defaultCatalog =
+ new GenericInMemoryCatalog(catalogName, databaseName);
SessionEnvironment environment =
SessionEnvironment.newBuilder()
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.registerCatalog(catalogName, defaultCatalog)
.registerModuleAtHead(moduleName, new TestModule())
.setDefaultCatalog(catalogName)
- .setDefaultDatabase(databaseName)
.build();
SessionHandle sessionHandle = service.openSession(environment);
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index f7b07ef35d1..855411d81b0 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.service.context;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
@@ -134,12 +135,13 @@ class SessionContextTest {
flinkConfig.set(MAX_PARALLELISM, 16);
DefaultContext defaultContext =
new DefaultContext(flinkConfig, Collections.singletonList(new DefaultCLI()));
+ SessionEnvironment environment =
+ SessionEnvironment.newBuilder()
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .addSessionConfig(flinkConfig.toMap())
+ .build();
return SessionContext.create(
- defaultContext,
- SessionHandle.create(),
- MockedEndpointVersion.V1,
- flinkConfig,
- EXECUTOR_SERVICE);
+ defaultContext, SessionHandle.create(), environment, EXECUTOR_SERVICE);
}
private ReadableConfig getConfiguration() {