You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/09/08 06:27:47 UTC

[flink] 01/02: [FLINK-29118][sql-gateway][hive] Remove default GenericInMemoryCatalog in the HiveServer2 Endpoint when openSession

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 833e7ffbb5f075d2014dfaec547b6987d59bc89f
Author: yuzelin <74...@qq.com>
AuthorDate: Wed Aug 31 13:42:03 2022 +0800

    [FLINK-29118][sql-gateway][hive] Remove default GenericInMemoryCatalog in the HiveServer2 Endpoint when openSession
    
     This closes #20714
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |   4 +-
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 119 ++++++++++-----------
 .../hive/HiveServer2EndpointStatementITCase.java   |   4 +
 .../gateway/api/session/SessionEnvironment.java    |  16 ---
 .../api/session/SessionEnvironmentTest.java        |   2 -
 .../gateway/service/context/SessionContext.java    | 118 +++++++++++++-------
 .../gateway/service/session/SessionManager.java    |  12 +--
 .../gateway/service/SqlGatewayServiceITCase.java   |   7 +-
 .../service/context/SessionContextTest.java        |  12 ++-
 9 files changed, 148 insertions(+), 146 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 55bd8b21987..12ffa946b4f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -311,7 +311,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             Catalog hiveCatalog =
                     new HiveCatalog(
                             catalogName,
-                            defaultDatabase,
+                            getUsedDefaultDatabase(originSessionConf).orElse(defaultDatabase),
                             conf,
                             HiveShimLoader.getHiveVersion(),
                             allowEmbedded);
@@ -323,8 +323,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                                     .registerCatalog(catalogName, hiveCatalog)
                                     .registerModuleAtHead(moduleName, hiveModule)
                                     .setDefaultCatalog(catalogName)
-                                    .setDefaultDatabase(
-                                            getUsedDefaultDatabase(originSessionConf).orElse(null))
                                     .addSessionConfig(sessionConfig)
                                     .build());
             // response
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 2bcaa020fcd..19e3074ed65 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -61,6 +61,7 @@ import org.apache.hive.service.rpc.thrift.TOperationType;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -108,6 +109,11 @@ public class HiveServer2EndpointITCase extends TestLogger {
     public static final HiveServer2EndpointExtension ENDPOINT_EXTENSION =
             new HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
 
+    @BeforeAll
+    public static void setup() throws Exception {
+        initializeEnvironment();
+    }
+
     @Test
     public void testOpenCloseJdbcConnection() throws Exception {
         SessionManager sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
@@ -256,21 +262,19 @@ public class HiveServer2EndpointITCase extends TestLogger {
         runGetObjectTest(
                 connection -> connection.getMetaData().getCatalogs(),
                 ResolvedSchema.of(Column.physical("TABLE_CAT", DataTypes.STRING())),
-                Arrays.asList(
-                        Collections.singletonList("default_catalog"),
-                        Collections.singletonList("hive")));
+                Collections.singletonList(Collections.singletonList("hive")));
     }
 
     @Test
     public void testGetSchemas() throws Exception {
         runGetObjectTest(
-                connection -> connection.getMetaData().getSchemas("default_catalog", null),
+                connection -> connection.getMetaData().getSchemas("hive", null),
                 getExpectedGetSchemasOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("db_diff", "default_catalog"),
-                        Arrays.asList("db_test1", "default_catalog"),
-                        Arrays.asList("db_test2", "default_catalog"),
-                        Arrays.asList("default_database", "default_catalog")));
+                        Arrays.asList("db_diff", "hive"),
+                        Arrays.asList("db_test1", "hive"),
+                        Arrays.asList("db_test2", "hive"),
+                        Arrays.asList("default", "hive")));
     }
 
     @Test
@@ -279,8 +283,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 connection -> connection.getMetaData().getSchemas(null, "db\\_test%"),
                 getExpectedGetSchemasOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("db_test1", "default_catalog"),
-                        Arrays.asList("db_test2", "default_catalog")));
+                        Arrays.asList("db_test1", "hive"), Arrays.asList("db_test2", "hive")));
     }
 
     @Test
@@ -296,16 +299,16 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                         new String[] {"MANAGED_TABLE", "VIRTUAL_VIEW"}),
                 getExpectedGetTablesOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_catalog", "db_diff", "tbl_1", "TABLE"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_1", "TABLE"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_2", "TABLE"),
-                        Arrays.asList("default_catalog", "db_test2", "diff_1", "TABLE"),
-                        Arrays.asList("default_catalog", "db_test2", "tbl_1", "TABLE"),
-                        Arrays.asList("default_catalog", "db_diff", "tbl_2", "VIEW"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_3", "VIEW"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_4", "VIEW"),
-                        Arrays.asList("default_catalog", "db_test2", "diff_2", "VIEW"),
-                        Arrays.asList("default_catalog", "db_test2", "tbl_2", "VIEW")));
+                        Arrays.asList("hive", "db_diff", "tbl_1", "TABLE"),
+                        Arrays.asList("hive", "db_test1", "tbl_1", "TABLE"),
+                        Arrays.asList("hive", "db_test1", "tbl_2", "TABLE"),
+                        Arrays.asList("hive", "db_test2", "diff_1", "TABLE"),
+                        Arrays.asList("hive", "db_test2", "tbl_1", "TABLE"),
+                        Arrays.asList("hive", "db_diff", "tbl_2", "VIEW"),
+                        Arrays.asList("hive", "db_test1", "tbl_3", "VIEW"),
+                        Arrays.asList("hive", "db_test1", "tbl_4", "VIEW"),
+                        Arrays.asList("hive", "db_test2", "diff_2", "VIEW"),
+                        Arrays.asList("hive", "db_test2", "tbl_2", "VIEW")));
     }
 
     @Test
@@ -315,15 +318,15 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         connection
                                 .getMetaData()
                                 .getTables(
-                                        "default_catalog",
+                                        "hive",
                                         "db\\_test_",
                                         "tbl%",
                                         new String[] {"VIRTUAL_VIEW"}),
                 getExpectedGetTablesOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_catalog", "db_test1", "tbl_3", "VIEW"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_4", "VIEW"),
-                        Arrays.asList("default_catalog", "db_test2", "tbl_2", "VIEW")));
+                        Arrays.asList("hive", "db_test1", "tbl_3", "VIEW"),
+                        Arrays.asList("hive", "db_test1", "tbl_4", "VIEW"),
+                        Arrays.asList("hive", "db_test2", "tbl_2", "VIEW")));
     }
 
     @Test
@@ -356,67 +359,64 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                 .isEqualTo(
                                         Arrays.asList(
                                                 Arrays.asList(
-                                                        "default_catalog",
-                                                        "db_diff",
-                                                        "tbl_2",
-                                                        "EXPR$0",
+                                                        "hive", "db_diff", "tbl_2", "EXPR$0",
                                                         "INT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_1",
                                                         "user",
                                                         "BIGINT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_1",
                                                         "product",
                                                         "STRING"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_1",
                                                         "amount",
                                                         "INT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_2",
                                                         "user",
                                                         "STRING"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_2",
                                                         "id",
                                                         "BIGINT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_2",
                                                         "timestamp",
                                                         "TIMESTAMP"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_3",
                                                         "EXPR$0",
                                                         "INT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test1",
                                                         "tbl_4",
                                                         "EXPR$0",
                                                         "INT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test2",
                                                         "diff_2",
                                                         "EXPR$0",
                                                         "INT"),
                                                 Arrays.asList(
-                                                        "default_catalog",
+                                                        "hive",
                                                         "db_test2",
                                                         "tbl_2",
                                                         "EXPR$0",
@@ -429,11 +429,11 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 connection ->
                         connection
                                 .getMetaData()
-                                .getColumns("default_catalog", "db\\_test_", "tbl\\_1", "user"),
+                                .getColumns("hive", "db\\_test_", "tbl\\_1", "user"),
                 getExpectedGetColumnsOperationSchema(),
                 Collections.singletonList(
                         Arrays.asList(
-                                "default_catalog",
+                                "hive",
                                 "db_test1",
                                 "tbl_1",
                                 "user",
@@ -454,9 +454,9 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 connection -> connection.getMetaData().getPrimaryKeys(null, null, null),
                 getExpectedGetPrimaryKeysOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_catalog", "db_test1", "tbl_1", "user", 1, "pk"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_2", "user", 1, "pk"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_2", "id", 2, "pk")));
+                        Arrays.asList("hive", "db_test1", "tbl_1", "user", 1, "pk"),
+                        Arrays.asList("hive", "db_test1", "tbl_2", "user", 1, "pk"),
+                        Arrays.asList("hive", "db_test1", "tbl_2", "id", 2, "pk")));
     }
 
     @Test
@@ -465,8 +465,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 connection -> connection.getMetaData().getPrimaryKeys(null, null, "tbl_2"),
                 getExpectedGetPrimaryKeysOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_catalog", "db_test1", "tbl_2", "user", 1, "pk"),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_2", "id", 2, "pk")));
+                        Arrays.asList("hive", "db_test1", "tbl_2", "user", 1, "pk"),
+                        Arrays.asList("hive", "db_test1", "tbl_2", "id", 2, "pk")));
     }
 
     @Test
@@ -541,14 +541,14 @@ public class HiveServer2EndpointITCase extends TestLogger {
                     try (Statement statement = connection.createStatement()) {
                         statement.execute(
                                 String.format(
-                                        "CREATE FUNCTION `default_catalog`.`db_test2`.`my_abs` as '%s'",
+                                        "CREATE FUNCTION `hive`.`db_test2`.`my_abs` as '%s'",
                                         JavaFunc0.class.getName()));
                         statement.execute(
                                 String.format(
-                                        "CREATE FUNCTION `default_catalog`.`db_diff`.`your_abs` as '%s'",
+                                        "CREATE FUNCTION `hive`.`db_diff`.`your_abs` as '%s'",
                                         JavaFunc0.class.getName()));
                     }
-                    return connection.getMetaData().getFunctions("default_catalog", "db.*", "my.*");
+                    return connection.getMetaData().getFunctions("hive", "db.*", "my.*");
                 },
                 ResolvedSchema.of(
                         Column.physical("FUNCTION_CAT", DataTypes.STRING()),
@@ -559,12 +559,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         Column.physical("SPECIFIC_NAME", DataTypes.STRING())),
                 Collections.singletonList(
                         Arrays.asList(
-                                "default_catalog",
-                                "db_test2",
-                                "my_abs",
-                                "",
-                                0,
-                                JavaFunc0.class.getName())));
+                                "hive", "db_test2", "my_abs", "", 0, JavaFunc0.class.getName())));
     }
 
     @Test
@@ -579,13 +574,12 @@ public class HiveServer2EndpointITCase extends TestLogger {
 
     // --------------------------------------------------------------------------------------------
 
-    private Connection getInitializedConnection() throws Exception {
-        Connection connection = ENDPOINT_EXTENSION.getConnection();
-        try (Statement statement = connection.createStatement()) {
+    private static void initializeEnvironment() throws Exception {
+        try (Connection connection = ENDPOINT_EXTENSION.getConnection();
+                Statement statement = connection.createStatement()) {
             statement.execute("SET table.sql-dialect=default");
-            statement.execute("USE CATALOG `default_catalog`");
 
-            // default_catalog: db_test1 | db_test2 | db_diff | default
+            // hive: db_test1 | db_test2 | db_diff | default
             //     db_test1: temporary table tbl_1, table tbl_2, temporary view tbl_3, view tbl_4
             //     db_test2: table tbl_1, table diff_1, view tbl_2, view diff_2
             //     db_diff:  table tbl_1, view tbl_2
@@ -595,7 +589,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
             statement.execute("CREATE DATABASE db_diff");
 
             statement.execute(
-                    "CREATE TEMPORARY TABLE db_test1.tbl_1(\n"
+                    "CREATE TABLE db_test1.tbl_1(\n"
                             + "`user` BIGINT CONSTRAINT `pk` PRIMARY KEY COMMENT 'user id.',\n"
                             + "`product` STRING NOT NULL,\n"
                             + "`amount`  INT) COMMENT 'temporary table tbl_1'");
@@ -606,7 +600,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
                             + "`timestamp` TIMESTAMP,"
                             + "CONSTRAINT `pk` PRIMARY KEY(`user`, `id`) NOT ENFORCED) COMMENT 'table tbl_2'");
             statement.execute(
-                    "CREATE TEMPORARY VIEW db_test1.tbl_3 COMMENT 'temporary view tbl_3' AS SELECT 1");
+                    "CREATE VIEW db_test1.tbl_3 COMMENT 'temporary view tbl_3' AS SELECT 1");
             statement.execute("CREATE VIEW db_test1.tbl_4 COMMENT 'view tbl_4' AS SELECT 1");
 
             statement.execute("CREATE TABLE db_test2.tbl_1 COMMENT 'table tbl_1'");
@@ -617,7 +611,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
             statement.execute("CREATE TABLE db_diff.tbl_1 COMMENT 'table tbl_1'");
             statement.execute("CREATE VIEW db_diff.tbl_2 COMMENT 'view tbl_2' AS SELECT 1");
         }
-        return connection;
     }
 
     private void runGetObjectTest(
@@ -636,7 +629,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
             ResolvedSchema expectedSchema,
             Consumer<List<List<Object>>> validator)
             throws Exception {
-        try (Connection connection = getInitializedConnection();
+        try (Connection connection = ENDPOINT_EXTENSION.getConnection();
                 java.sql.ResultSet result = resultSetSupplier.apply(connection)) {
             assertSchemaEquals(expectedSchema, result.getMetaData());
             validator.accept(collectAndCompact(result, expectedSchema.getColumnCount()));
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
index 34abff6fc07..09127b995ef 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
@@ -155,6 +155,10 @@ public class HiveServer2EndpointStatementITCase extends AbstractSqlGatewayStatem
         for (String sql :
                 Arrays.asList(
                         "RESET",
+                        "CREATE CATALOG `default_catalog` \n"
+                                + "WITH (\n"
+                                + "'type' = 'generic_in_memory',\n"
+                                + "'default-database' = 'default_database')",
                         "USE CATALOG `default_catalog`",
                         "DROP CATALOG hive",
                         "UNLOAD MODULE hive")) {
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
index b6372583e39..8be8d265442 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
@@ -43,7 +43,6 @@ public class SessionEnvironment {
     private final Map<String, Catalog> registeredCatalogs;
     private final Map<String, Module> registeredModules;
     private final @Nullable String defaultCatalog;
-    private final @Nullable String defaultDatabase;
     private final Map<String, String> sessionConfig;
 
     @VisibleForTesting
@@ -53,14 +52,12 @@ public class SessionEnvironment {
             Map<String, Catalog> registeredCatalogs,
             Map<String, Module> registeredModules,
             @Nullable String defaultCatalog,
-            @Nullable String defaultDatabase,
             Map<String, String> sessionConfig) {
         this.sessionName = sessionName;
         this.version = version;
         this.registeredCatalogs = registeredCatalogs;
         this.registeredModules = registeredModules;
         this.defaultCatalog = defaultCatalog;
-        this.defaultDatabase = defaultDatabase;
         this.sessionConfig = sessionConfig;
     }
 
@@ -92,10 +89,6 @@ public class SessionEnvironment {
         return Optional.ofNullable(defaultCatalog);
     }
 
-    public Optional<String> getDefaultDatabase() {
-        return Optional.ofNullable(defaultDatabase);
-    }
-
     // -------------------------------------------------------------------------------------------
 
     @Override
@@ -112,7 +105,6 @@ public class SessionEnvironment {
                 && Objects.equals(registeredCatalogs, that.registeredCatalogs)
                 && Objects.equals(registeredModules, that.registeredModules)
                 && Objects.equals(defaultCatalog, that.defaultCatalog)
-                && Objects.equals(defaultDatabase, that.defaultDatabase)
                 && Objects.equals(sessionConfig, that.sessionConfig);
     }
 
@@ -124,7 +116,6 @@ public class SessionEnvironment {
                 registeredCatalogs,
                 registeredModules,
                 defaultCatalog,
-                defaultDatabase,
                 sessionConfig);
     }
 
@@ -145,7 +136,6 @@ public class SessionEnvironment {
         private final Map<String, Catalog> registeredCatalogs = new HashMap<>();
         private final Map<String, Module> registeredModules = new HashMap<>();
         private @Nullable String defaultCatalog;
-        private @Nullable String defaultDatabase;
 
         public Builder setSessionName(String sessionName) {
             this.sessionName = sessionName;
@@ -167,11 +157,6 @@ public class SessionEnvironment {
             return this;
         }
 
-        public Builder setDefaultDatabase(@Nullable String defaultDatabase) {
-            this.defaultDatabase = defaultDatabase;
-            return this;
-        }
-
         public Builder registerCatalog(String catalogName, Catalog catalog) {
             if (registeredCatalogs.containsKey(catalogName)) {
                 throw new ValidationException(
@@ -198,7 +183,6 @@ public class SessionEnvironment {
                     registeredCatalogs,
                     registeredModules,
                     defaultCatalog,
-                    defaultDatabase,
                     sessionConfig);
         }
     }
diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
index ad08a05b86f..5d61ffaa1d7 100644
--- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
+++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/session/SessionEnvironmentTest.java
@@ -45,7 +45,6 @@ public class SessionEnvironmentTest {
                         new HashMap<>(),
                         new HashMap<>(),
                         "default",
-                        "default_db",
                         configMap);
 
         SessionEnvironment actualEnvironment =
@@ -54,7 +53,6 @@ public class SessionEnvironmentTest {
                         .setSessionEndpointVersion(MockedEndpointVersion.V1)
                         .addSessionConfig(configMap)
                         .setDefaultCatalog("default")
-                        .setDefaultDatabase("default_db")
                         .build();
         assertEquals(expectedEnvironment, actualEnvironment);
     }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index bee1248914e..35622c9b917 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -38,11 +38,12 @@ import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.operation.OperationExecutor;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
-import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -159,26 +160,6 @@ public class SessionContext {
     // Method to execute commands
     // --------------------------------------------------------------------------------------------
 
-    public void registerCatalog(String catalogName, Catalog catalog) {
-        sessionState.catalogManager.registerCatalog(catalogName, catalog);
-    }
-
-    public void registerModuleAtHead(String moduleName, Module module) {
-        Deque<String> moduleNames = new ArrayDeque<>(sessionState.moduleManager.listModules());
-        moduleNames.addFirst(moduleName);
-
-        sessionState.moduleManager.loadModule(moduleName, module);
-        sessionState.moduleManager.useModules(moduleNames.toArray(new String[0]));
-    }
-
-    public void setCurrentCatalog(String catalog) {
-        sessionState.catalogManager.setCurrentCatalog(catalog);
-    }
-
-    public void setCurrentDatabase(String database) {
-        sessionState.catalogManager.setCurrentDatabase(database);
-    }
-
     public OperationExecutor createOperationExecutor(Configuration executionConfig) {
         return new OperationExecutor(this, executionConfig);
     }
@@ -207,17 +188,16 @@ public class SessionContext {
     public static SessionContext create(
             DefaultContext defaultContext,
             SessionHandle sessionId,
-            EndpointVersion endpointVersion,
-            Configuration sessionConf,
+            SessionEnvironment environment,
             ExecutorService operationExecutorService) {
         // --------------------------------------------------------------------------------------------------------------
         // Init config
         // --------------------------------------------------------------------------------------------------------------
 
         Configuration configuration = defaultContext.getFlinkConfig().clone();
-        configuration.addAll(sessionConf);
+        configuration.addAll(Configuration.fromMap(environment.getSessionConfig()));
         // every session configure the specific local resource download directory
-        setResourceDownloadTmpDir(sessionConf, sessionId);
+        setResourceDownloadTmpDir(configuration, sessionId);
 
         // --------------------------------------------------------------------------------------------------------------
         // Init classloader
@@ -233,22 +213,10 @@ public class SessionContext {
 
         final ResourceManager resourceManager = new ResourceManager(configuration, userClassLoader);
 
-        final ModuleManager moduleManager = new ModuleManager();
-
-        final EnvironmentSettings settings =
-                EnvironmentSettings.newInstance().withConfiguration(configuration).build();
+        final ModuleManager moduleManager = buildModuleManager(environment);
 
-        CatalogManager catalogManager =
-                CatalogManager.newBuilder()
-                        // Currently, the classloader is only used by DataTypeFactory.
-                        .classLoader(userClassLoader)
-                        .config(configuration)
-                        .defaultCatalog(
-                                settings.getBuiltInCatalogName(),
-                                new GenericInMemoryCatalog(
-                                        settings.getBuiltInCatalogName(),
-                                        settings.getBuiltInDatabaseName()))
-                        .build();
+        final CatalogManager catalogManager =
+                buildCatalogManager(configuration, userClassLoader, environment);
 
         final FunctionCatalog functionCatalog =
                 new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager);
@@ -258,7 +226,7 @@ public class SessionContext {
         return new SessionContext(
                 defaultContext,
                 sessionId,
-                endpointVersion,
+                environment.getSessionEndpointVersion(),
                 configuration,
                 userClassLoader,
                 sessionState,
@@ -363,6 +331,74 @@ public class SessionContext {
                 TableConfigOptions.RESOURCES_DOWNLOAD_DIR, path.toAbsolutePath().toString());
     }
 
+    private static ModuleManager buildModuleManager(SessionEnvironment environment) {
+        final ModuleManager moduleManager = new ModuleManager();
+
+        environment
+                .getRegisteredModules()
+                .forEach(
+                        (moduleName, module) -> {
+                            Deque<String> moduleNames =
+                                    new ArrayDeque<>(moduleManager.listModules());
+                            moduleNames.addFirst(moduleName);
+
+                            moduleManager.loadModule(moduleName, module);
+                            moduleManager.useModules(moduleNames.toArray(new String[0]));
+                        });
+
+        return moduleManager;
+    }
+
+    private static CatalogManager buildCatalogManager(
+            Configuration configuration,
+            URLClassLoader userClassLoader,
+            SessionEnvironment environment) {
+        CatalogManager.Builder builder =
+                CatalogManager.newBuilder()
+                        // Currently, the classloader is only used by DataTypeFactory.
+                        .classLoader(userClassLoader)
+                        .config(configuration);
+
+        // init default catalog
+        String defaultCatalogName;
+        Catalog defaultCatalog;
+        if (environment.getDefaultCatalog().isPresent()) {
+            defaultCatalogName = environment.getDefaultCatalog().get();
+            defaultCatalog = environment.getRegisteredCatalogs().get(defaultCatalogName);
+        } else {
+            EnvironmentSettings settings =
+                    EnvironmentSettings.newInstance().withConfiguration(configuration).build();
+            defaultCatalogName = settings.getBuiltInCatalogName();
+
+            if (environment.getRegisteredCatalogs().containsKey(defaultCatalogName)) {
+                throw new SqlGatewayException(
+                        String.format(
+                                "The name of the registered catalog is conflicts with the built-in default catalog name: %s.",
+                                defaultCatalogName));
+            }
+
+            defaultCatalog =
+                    new GenericInMemoryCatalog(
+                            defaultCatalogName, settings.getBuiltInDatabaseName());
+        }
+        defaultCatalog.open();
+
+        CatalogManager catalogManager =
+                builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();
+
+        // filter the default catalog out to avoid repeated registration
+        environment
+                .getRegisteredCatalogs()
+                .forEach(
+                        (catalogName, catalog) -> {
+                            if (!catalogName.equals(defaultCatalogName)) {
+                                catalogManager.registerCatalog(catalogName, catalog);
+                            }
+                        });
+
+        return catalogManager;
+    }
+
     // --------------------------------------------------------------------------------------------
     // Inner class
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
index 1b3187b90c4..575ac64b6e3 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.gateway.service.session;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
@@ -149,16 +148,7 @@ public class SessionManager {
 
         SessionContext sessionContext =
                 SessionContext.create(
-                        defaultContext,
-                        sessionId,
-                        environment.getSessionEndpointVersion(),
-                        Configuration.fromMap(environment.getSessionConfig()),
-                        operationExecutorService);
-
-        environment.getRegisteredCatalogs().forEach(sessionContext::registerCatalog);
-        environment.getRegisteredModules().forEach(sessionContext::registerModuleAtHead);
-        environment.getDefaultCatalog().ifPresent(sessionContext::setCurrentCatalog);
-        environment.getDefaultDatabase().ifPresent(sessionContext::setCurrentDatabase);
+                        defaultContext, sessionId, environment, operationExecutorService);
 
         session = new Session(sessionContext);
         sessions.put(sessionId, session);
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 685bd04cb30..c8c8513bdb6 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -138,16 +137,14 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         String catalogName = "default";
         String databaseName = "testDb";
         String moduleName = "testModule";
-        GenericInMemoryCatalog defaultCatalog = new GenericInMemoryCatalog(catalogName);
-        defaultCatalog.createDatabase(
-                databaseName, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
+        GenericInMemoryCatalog defaultCatalog =
+                new GenericInMemoryCatalog(catalogName, databaseName);
         SessionEnvironment environment =
                 SessionEnvironment.newBuilder()
                         .setSessionEndpointVersion(MockedEndpointVersion.V1)
                         .registerCatalog(catalogName, defaultCatalog)
                         .registerModuleAtHead(moduleName, new TestModule())
                         .setDefaultCatalog(catalogName)
-                        .setDefaultDatabase(databaseName)
                         .build();
 
         SessionHandle sessionHandle = service.openSession(environment);
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index f7b07ef35d1..855411d81b0 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.service.context;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
 import org.apache.flink.table.gateway.api.utils.ThreadUtils;
@@ -134,12 +135,13 @@ class SessionContextTest {
         flinkConfig.set(MAX_PARALLELISM, 16);
         DefaultContext defaultContext =
                 new DefaultContext(flinkConfig, Collections.singletonList(new DefaultCLI()));
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(flinkConfig.toMap())
+                        .build();
         return SessionContext.create(
-                defaultContext,
-                SessionHandle.create(),
-                MockedEndpointVersion.V1,
-                flinkConfig,
-                EXECUTOR_SERVICE);
+                defaultContext, SessionHandle.create(), environment, EXECUTOR_SERVICE);
     }
 
     private ReadableConfig getConfiguration() {