You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/29 01:59:10 UTC

[flink] 01/04: [FLINK-28152][sql-gateway][hive] Introduce option "thrift.host" for HiveServer2 Endpoint and improve codes

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 27cecdebccc95e9acf6ba38616b14e531b35d1be
Author: Shengkai <10...@qq.com>
AuthorDate: Tue Jul 26 10:15:46 2022 +0800

    [FLINK-28152][sql-gateway][hive] Introduce option "thrift.host" for HiveServer2 Endpoint and improve codes
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 35 +++++---
 .../hive/HiveServer2EndpointConfigOptions.java     | 11 ++-
 .../endpoint/hive/HiveServer2EndpointFactory.java  | 23 +++++-
 .../hive/HiveServer2EndpointFactoryTest.java       | 93 ++++++++++++----------
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 17 +++-
 .../hive/util/HiveServer2EndpointExtension.java    |  2 +
 .../gateway/service/context/SessionContext.java    |  6 ++
 .../service/operation/OperationManager.java        |  2 +
 8 files changed, 131 insertions(+), 58 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 4c25f9a2b7d..262177a547e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.endpoint.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
@@ -100,6 +101,8 @@ import java.util.Objects;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
 import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
@@ -126,13 +129,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     // --------------------------------------------------------------------------------------------
 
     private final SqlGatewayService service;
+    private final InetAddress hostAddress;
+    private final int port;
     private final int minWorkerThreads;
     private final int maxWorkerThreads;
     private final Duration workerKeepAliveTime;
     private final int requestTimeoutMs;
     private final int backOffSlotLengthMs;
     private final long maxMessageSize;
-    private final int port;
 
     private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
     private ThreadPoolExecutor executor;
@@ -155,6 +159,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     public HiveServer2Endpoint(
             SqlGatewayService service,
+            InetAddress hostAddress,
             int port,
             long maxMessageSize,
             int requestTimeoutMs,
@@ -168,6 +173,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             String moduleName) {
         this(
                 service,
+                hostAddress,
                 port,
                 maxMessageSize,
                 requestTimeoutMs,
@@ -185,6 +191,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @VisibleForTesting
     public HiveServer2Endpoint(
             SqlGatewayService service,
+            InetAddress hostAddress,
             int port,
             long maxMessageSize,
             int requestTimeoutMs,
@@ -199,6 +206,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             boolean allowEmbedded) {
         this.service = service;
 
+        this.hostAddress = hostAddress;
         this.port = port;
         this.maxMessageSize = maxMessageSize;
         this.requestTimeoutMs = requestTimeoutMs;
@@ -228,7 +236,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
         }
 
         if (executor != null) {
-            executor.shutdown();
+            executor.shutdownNow();
         }
     }
 
@@ -255,6 +263,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                             : tOpenSessionReq.getConfiguration();
             Map<String, String> sessionConfig = new HashMap<>();
             sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());
+            sessionConfig.put(RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name());
+            sessionConfig.put(TABLE_DML_SYNC.key(), "true");
             sessionConfig.putAll(validateAndNormalize(originSessionConf));
 
             HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null);
@@ -284,7 +294,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             resp.setSessionHandle(toTSessionHandle(sessionHandle));
             resp.setConfiguration(service.getSessionConfig(sessionHandle));
         } catch (Exception e) {
-            LOG.error("Failed to openSession.", e);
+            LOG.error("Failed to OpenSession.", e);
             resp.setStatus(toTStatus(e));
         }
         return resp;
@@ -298,7 +308,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             service.closeSession(sessionHandle);
             resp.setStatus(OK_STATUS);
         } catch (Throwable t) {
-            LOG.error("Failed to closeSession.", t);
+            LOG.error("Failed to CloseSession.", t);
             resp.setStatus(toTStatus(t));
         }
         return resp;
@@ -419,12 +429,13 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
         }
         HiveServer2Endpoint that = (HiveServer2Endpoint) o;
 
-        return minWorkerThreads == that.minWorkerThreads
+        return Objects.equals(hostAddress, that.hostAddress)
+                && port == that.port
+                && minWorkerThreads == that.minWorkerThreads
                 && maxWorkerThreads == that.maxWorkerThreads
                 && requestTimeoutMs == that.requestTimeoutMs
                 && backOffSlotLengthMs == that.backOffSlotLengthMs
                 && maxMessageSize == that.maxMessageSize
-                && port == that.port
                 && Objects.equals(workerKeepAliveTime, that.workerKeepAliveTime)
                 && Objects.equals(catalogName, that.catalogName)
                 && Objects.equals(defaultDatabase, that.defaultDatabase)
@@ -436,13 +447,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public int hashCode() {
         return Objects.hash(
+                hostAddress,
+                port,
                 minWorkerThreads,
                 maxWorkerThreads,
                 workerKeepAliveTime,
                 requestTimeoutMs,
                 backOffSlotLengthMs,
                 maxMessageSize,
-                port,
                 catalogName,
                 defaultDatabase,
                 hiveConfPath,
@@ -453,10 +465,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public void run() {
         try {
-            LOG.info("HiveServer2 Endpoint begins to listen on {}.", port);
+            LOG.info(
+                    "HiveServer2 Endpoint begins to listen on {}:{}.",
+                    hostAddress.getHostAddress(),
+                    port);
             server.serve();
         } catch (Throwable t) {
             LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
+            System.exit(-1);
         }
     }
 
@@ -472,8 +488,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
                     new TThreadPoolServer(
                             new TThreadPoolServer.Args(
                                             new TServerSocket(
-                                                    new ServerSocket(
-                                                            port, -1, InetAddress.getByName(null))))
+                                                    new ServerSocket(port, -1, hostAddress)))
                                     .processorFactory(
                                             new TProcessorFactory(
                                                     new TCLIService.Processor<>(this)))
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
index 22f82ca1fbb..483910a01d3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
@@ -35,10 +35,19 @@ public class HiveServer2EndpointConfigOptions {
     // Server Options
     // --------------------------------------------------------------------------------------------
 
+    public static final ConfigOption<String> THRIFT_HOST =
+            ConfigOptions.key("thrift.host")
+                    .stringType()
+                    .defaultValue("localhost")
+                    .withDescription(
+                            "The server address of HiveServer2 host to be used for communication."
+                                    + "Default is empty, which means the to bind to the localhost. "
+                                    + "This is only necessary if the host has multiple network addresses.");
+
     public static final ConfigOption<Integer> THRIFT_PORT =
             ConfigOptions.key("thrift.port")
                     .intType()
-                    .defaultValue(8084)
+                    .defaultValue(10000)
                     .withDescription("The port of the HiveServer2 endpoint");
 
     public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
index 7a1fd06e596..00ce8d287c9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
@@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -33,6 +36,7 @@ import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOpti
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_NAME;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.MODULE_NAME;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_HOST;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_TIMEOUT;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_MAX_MESSAGE_SIZE;
@@ -55,6 +59,7 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
         validate(configuration);
         return new HiveServer2Endpoint(
                 context.getSqlGatewayService(),
+                getHostAddress(configuration.get(THRIFT_HOST)),
                 configuration.get(THRIFT_PORT),
                 checkNotNull(configuration.get(THRIFT_MAX_MESSAGE_SIZE)),
                 (int) configuration.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
@@ -75,8 +80,14 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
         return new HashSet<>(
                 Arrays.asList(
+                        THRIFT_HOST,
                         THRIFT_PORT,
                         THRIFT_MAX_MESSAGE_SIZE,
                         THRIFT_LOGIN_TIMEOUT,
@@ -84,12 +95,18 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
                         THRIFT_WORKER_THREADS_MAX,
                         THRIFT_WORKER_KEEPALIVE_TIME,
                         CATALOG_NAME,
+                        CATALOG_HIVE_CONF_DIR,
+                        CATALOG_DEFAULT_DATABASE,
                         MODULE_NAME));
     }
 
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        return new HashSet<>(Arrays.asList(CATALOG_HIVE_CONF_DIR, CATALOG_DEFAULT_DATABASE));
+    private static InetAddress getHostAddress(String hostName) {
+        try {
+            return InetAddress.getByName(hostName);
+        } catch (UnknownHostException e) {
+            throw new ValidationException(
+                    String.format("Can not get the address for the host '%s'.", hostName));
+        }
     }
 
     private static void validate(ReadableConfig configuration) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
index cd2787193db..6fc7879ae4a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
@@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtil
 import org.apache.flink.table.gateway.api.utils.MockedSqlGatewayService;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.net.InetAddress;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -38,6 +41,7 @@ import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOpti
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_NAME;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.MODULE_NAME;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_HOST;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_TIMEOUT;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_MAX_MESSAGE_SIZE;
@@ -70,7 +74,7 @@ class HiveServer2EndpointFactoryTest {
     private final String moduleName = "test-module";
 
     @Test
-    public void testCreateHiveServer2Endpoint() {
+    public void testCreateHiveServer2Endpoint() throws Exception {
         assertThat(
                         SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
                                 service, Configuration.fromMap(getDefaultConfig())))
@@ -78,6 +82,7 @@ class HiveServer2EndpointFactoryTest {
                         Collections.singletonList(
                                 new HiveServer2Endpoint(
                                         service,
+                                        InetAddress.getByName("localhost"),
                                         port,
                                         maxMessageSize,
                                         (int) loginTimeout.toMillis(),
@@ -91,45 +96,47 @@ class HiveServer2EndpointFactoryTest {
                                         moduleName)));
     }
 
-    @Test
-    public void testCreateHiveServer2EndpointWithIllegalArgument() {
-        List<TestSpec<?>> specs =
-                Arrays.asList(
-                        new TestSpec<>(
-                                THRIFT_WORKER_THREADS_MIN,
-                                -1,
-                                "The specified min thrift worker thread number is -1, which should be larger than 0."),
-                        new TestSpec<>(
-                                THRIFT_WORKER_THREADS_MAX,
-                                0,
-                                "The specified max thrift worker thread number is 0, which should be larger than 0."),
-                        new TestSpec<>(
-                                THRIFT_PORT,
-                                1008668001,
-                                "The specified port is 1008668001, which should range from 0 to 65535."),
-                        new TestSpec<>(
-                                THRIFT_LOGIN_TIMEOUT,
-                                "9223372036854775807 ms",
-                                "The specified login timeout should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."),
-                        new TestSpec<>(
-                                THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
-                                "9223372036854775807 ms",
-                                "The specified binary exponential backoff slot time should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."));
-
-        for (TestSpec<?> spec : specs) {
-            assertThatThrownBy(
-                            () ->
-                                    SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
-                                            service,
-                                            Configuration.fromMap(
-                                                    getModifiedConfig(
-                                                            config ->
-                                                                    setEndpointOption(
-                                                                            config,
-                                                                            spec.option,
-                                                                            spec.value)))))
-                    .satisfies(FlinkAssertions.anyCauseMatches(spec.exceptionMessage));
-        }
+    @ParameterizedTest
+    @MethodSource("getIllegalArgumentTestSpecs")
+    public void testCreateHiveServer2EndpointWithIllegalArgument(TestSpec<?> spec) {
+        assertThatThrownBy(
+                        () ->
+                                SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
+                                        service,
+                                        Configuration.fromMap(
+                                                getModifiedConfig(
+                                                        config ->
+                                                                setEndpointOption(
+                                                                        config,
+                                                                        spec.option,
+                                                                        spec.value)))))
+                .satisfies(FlinkAssertions.anyCauseMatches(spec.exceptionMessage));
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static List<TestSpec<?>> getIllegalArgumentTestSpecs() {
+        return Arrays.asList(
+                new TestSpec<>(
+                        THRIFT_WORKER_THREADS_MIN,
+                        -1,
+                        "The specified min thrift worker thread number is -1, which should be larger than 0."),
+                new TestSpec<>(
+                        THRIFT_WORKER_THREADS_MAX,
+                        0,
+                        "The specified max thrift worker thread number is 0, which should be larger than 0."),
+                new TestSpec<>(
+                        THRIFT_PORT,
+                        1008668001,
+                        "The specified port is 1008668001, which should range from 0 to 65535."),
+                new TestSpec<>(
+                        THRIFT_LOGIN_TIMEOUT,
+                        "9223372036854775807 ms",
+                        "The specified login timeout should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."),
+                new TestSpec<>(
+                        THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
+                        "9223372036854775807 ms",
+                        "The specified binary exponential backoff slot time should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."));
     }
 
     // --------------------------------------------------------------------------------------------
@@ -145,6 +152,11 @@ class HiveServer2EndpointFactoryTest {
             this.value = value;
             this.exceptionMessage = exceptionMessage;
         }
+
+        @Override
+        public String toString() {
+            return "TestSpec{option=" + option.key() + '}';
+        }
     }
 
     private Map<String, String> getModifiedConfig(Consumer<Map<String, String>> consumer) {
@@ -158,6 +170,7 @@ class HiveServer2EndpointFactoryTest {
 
         config.put(SQL_GATEWAY_ENDPOINT_TYPE.key(), IDENTIFIER);
 
+        setEndpointOption(config, THRIFT_HOST, "localhost");
         setEndpointOption(config, THRIFT_PORT, port);
         setEndpointOption(config, THRIFT_WORKER_THREADS_MIN, minWorkerThreads);
         setEndpointOption(config, THRIFT_WORKER_THREADS_MAX, maxWorkerThreads);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 7268220cd7c..30951a5765c 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -41,12 +41,16 @@ import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.net.InetAddress;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.AbstractMap;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -83,7 +87,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
         TOpenSessionReq openSessionReq = new TOpenSessionReq();
 
         Map<String, String> configs = new HashMap<>();
-        configs.put(TABLE_DML_SYNC.key(), "true");
+        configs.put(MAX_LENGTH_GENERATED_CODE.key(), "-1");
         // simulate to set config using hive jdbc
         configs.put("set:hiveconf:key", "value");
         // TODO: set hivevar when FLINK-28096 is fixed
@@ -99,6 +103,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         new AbstractMap.SimpleEntry<>(
                                 TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()),
                         new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"),
+                        new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()),
+                        new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"),
                         new AbstractMap.SimpleEntry<>("key", "value"));
     }
 
@@ -121,13 +127,16 @@ public class HiveServer2EndpointITCase extends TestLogger {
     private Connection getConnection() throws Exception {
         return DriverManager.getConnection(
                 String.format(
-                        "jdbc:hive2://localhost:%s/default;auth=noSasl",
-                        ENDPOINT_EXTENSION.getPort()));
+                        "jdbc:hive2://%s:%s/default;auth=noSasl",
+                        InetAddress.getLocalHost().getHostAddress(), ENDPOINT_EXTENSION.getPort()));
     }
 
     private TCLIService.Client createClient() throws Exception {
         TTransport transport =
-                HiveAuthUtils.getSocketTransport("localhost", ENDPOINT_EXTENSION.getPort(), 0);
+                HiveAuthUtils.getSocketTransport(
+                        InetAddress.getLocalHost().getHostAddress(),
+                        ENDPOINT_EXTENSION.getPort(),
+                        0);
         transport.open();
         return new TCLIService.Client(new TBinaryProtocol(transport));
     }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
index 8b4957bb8c2..fa6093f4451 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 
+import java.net.InetAddress;
 import java.util.function.Supplier;
 
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_DEFAULT_DATABASE;
@@ -67,6 +68,7 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll
         endpoint =
                 new HiveServer2Endpoint(
                         serviceSupplier.get(),
+                        InetAddress.getLocalHost(),
                         port.getPort(),
                         checkNotNull(endpointConfig.get(THRIFT_MAX_MESSAGE_SIZE)),
                         (int) endpointConfig.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 2b5f103460a..44eee43fde9 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -57,7 +57,9 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
@@ -154,7 +156,11 @@ public class SessionContext {
     }
 
     public void registerModule(String moduleName, Module module) {
+        Deque<String> moduleNames = new ArrayDeque<>(sessionState.moduleManager.listModules());
+        moduleNames.addFirst(moduleName);
+
         sessionState.moduleManager.loadModule(moduleName, module);
+        sessionState.moduleManager.useModules(moduleNames.toArray(new String[0]));
     }
 
     public void setCurrentCatalog(String catalog) {
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index ad2abd80698..80ec381eb96 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -354,6 +354,8 @@ public class OperationManager {
         }
     }
 
+    // -------------------------------------------------------------------------------------------
+
     @VisibleForTesting
     public int getOperationCount() {
         return submittedOperations.size();