You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/29 01:59:10 UTC
[flink] 01/04: [FLINK-28152][sql-gateway][hive] Introduce option "thrift.host" for HiveServer2 Endpoint and improve codes
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 27cecdebccc95e9acf6ba38616b14e531b35d1be
Author: Shengkai <10...@qq.com>
AuthorDate: Tue Jul 26 10:15:46 2022 +0800
[FLINK-28152][sql-gateway][hive] Introduce option "thrift.host" for HiveServer2 Endpoint and improve codes
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 35 +++++---
.../hive/HiveServer2EndpointConfigOptions.java | 11 ++-
.../endpoint/hive/HiveServer2EndpointFactory.java | 23 +++++-
.../hive/HiveServer2EndpointFactoryTest.java | 93 ++++++++++++----------
.../endpoint/hive/HiveServer2EndpointITCase.java | 17 +++-
.../hive/util/HiveServer2EndpointExtension.java | 2 +
.../gateway/service/context/SessionContext.java | 6 ++
.../service/operation/OperationManager.java | 2 +
8 files changed, 131 insertions(+), 58 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 4c25f9a2b7d..262177a547e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.endpoint.hive;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.hive.HiveCatalog;
@@ -100,6 +101,8 @@ import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
@@ -126,13 +129,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
// --------------------------------------------------------------------------------------------
private final SqlGatewayService service;
+ private final InetAddress hostAddress;
+ private final int port;
private final int minWorkerThreads;
private final int maxWorkerThreads;
private final Duration workerKeepAliveTime;
private final int requestTimeoutMs;
private final int backOffSlotLengthMs;
private final long maxMessageSize;
- private final int port;
private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
private ThreadPoolExecutor executor;
@@ -155,6 +159,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
public HiveServer2Endpoint(
SqlGatewayService service,
+ InetAddress hostAddress,
int port,
long maxMessageSize,
int requestTimeoutMs,
@@ -168,6 +173,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
String moduleName) {
this(
service,
+ hostAddress,
port,
maxMessageSize,
requestTimeoutMs,
@@ -185,6 +191,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@VisibleForTesting
public HiveServer2Endpoint(
SqlGatewayService service,
+ InetAddress hostAddress,
int port,
long maxMessageSize,
int requestTimeoutMs,
@@ -199,6 +206,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
boolean allowEmbedded) {
this.service = service;
+ this.hostAddress = hostAddress;
this.port = port;
this.maxMessageSize = maxMessageSize;
this.requestTimeoutMs = requestTimeoutMs;
@@ -228,7 +236,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
}
if (executor != null) {
- executor.shutdown();
+ executor.shutdownNow();
}
}
@@ -255,6 +263,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
: tOpenSessionReq.getConfiguration();
Map<String, String> sessionConfig = new HashMap<>();
sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());
+ sessionConfig.put(RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name());
+ sessionConfig.put(TABLE_DML_SYNC.key(), "true");
sessionConfig.putAll(validateAndNormalize(originSessionConf));
HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null);
@@ -284,7 +294,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
resp.setSessionHandle(toTSessionHandle(sessionHandle));
resp.setConfiguration(service.getSessionConfig(sessionHandle));
} catch (Exception e) {
- LOG.error("Failed to openSession.", e);
+ LOG.error("Failed to OpenSession.", e);
resp.setStatus(toTStatus(e));
}
return resp;
@@ -298,7 +308,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
service.closeSession(sessionHandle);
resp.setStatus(OK_STATUS);
} catch (Throwable t) {
- LOG.error("Failed to closeSession.", t);
+ LOG.error("Failed to CloseSession.", t);
resp.setStatus(toTStatus(t));
}
return resp;
@@ -419,12 +429,13 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
}
HiveServer2Endpoint that = (HiveServer2Endpoint) o;
- return minWorkerThreads == that.minWorkerThreads
+ return Objects.equals(hostAddress, that.hostAddress)
+ && port == that.port
+ && minWorkerThreads == that.minWorkerThreads
&& maxWorkerThreads == that.maxWorkerThreads
&& requestTimeoutMs == that.requestTimeoutMs
&& backOffSlotLengthMs == that.backOffSlotLengthMs
&& maxMessageSize == that.maxMessageSize
- && port == that.port
&& Objects.equals(workerKeepAliveTime, that.workerKeepAliveTime)
&& Objects.equals(catalogName, that.catalogName)
&& Objects.equals(defaultDatabase, that.defaultDatabase)
@@ -436,13 +447,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public int hashCode() {
return Objects.hash(
+ hostAddress,
+ port,
minWorkerThreads,
maxWorkerThreads,
workerKeepAliveTime,
requestTimeoutMs,
backOffSlotLengthMs,
maxMessageSize,
- port,
catalogName,
defaultDatabase,
hiveConfPath,
@@ -453,10 +465,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public void run() {
try {
- LOG.info("HiveServer2 Endpoint begins to listen on {}.", port);
+ LOG.info(
+ "HiveServer2 Endpoint begins to listen on {}:{}.",
+ hostAddress.getHostAddress(),
+ port);
server.serve();
} catch (Throwable t) {
LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
+ System.exit(-1);
}
}
@@ -472,8 +488,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
new TThreadPoolServer(
new TThreadPoolServer.Args(
new TServerSocket(
- new ServerSocket(
- port, -1, InetAddress.getByName(null))))
+ new ServerSocket(port, -1, hostAddress)))
.processorFactory(
new TProcessorFactory(
new TCLIService.Processor<>(this)))
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
index 22f82ca1fbb..483910a01d3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
@@ -35,10 +35,19 @@ public class HiveServer2EndpointConfigOptions {
// Server Options
// --------------------------------------------------------------------------------------------
+ public static final ConfigOption<String> THRIFT_HOST =
+ ConfigOptions.key("thrift.host")
+ .stringType()
+ .defaultValue("localhost")
+ .withDescription(
+ "The server address of HiveServer2 host to be used for communication."
+ + "Default is empty, which means the to bind to the localhost. "
+ + "This is only necessary if the host has multiple network addresses.");
+
public static final ConfigOption<Integer> THRIFT_PORT =
ConfigOptions.key("thrift.port")
.intType()
- .defaultValue(8084)
+ .defaultValue(10000)
.withDescription("The port of the HiveServer2 endpoint");
public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
index 7a1fd06e596..00ce8d287c9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
@@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -33,6 +36,7 @@ import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOpti
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_NAME;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.MODULE_NAME;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_HOST;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_TIMEOUT;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_MAX_MESSAGE_SIZE;
@@ -55,6 +59,7 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
validate(configuration);
return new HiveServer2Endpoint(
context.getSqlGatewayService(),
+ getHostAddress(configuration.get(THRIFT_HOST)),
configuration.get(THRIFT_PORT),
checkNotNull(configuration.get(THRIFT_MAX_MESSAGE_SIZE)),
(int) configuration.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
@@ -75,8 +80,14 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>(
Arrays.asList(
+ THRIFT_HOST,
THRIFT_PORT,
THRIFT_MAX_MESSAGE_SIZE,
THRIFT_LOGIN_TIMEOUT,
@@ -84,12 +95,18 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
THRIFT_WORKER_THREADS_MAX,
THRIFT_WORKER_KEEPALIVE_TIME,
CATALOG_NAME,
+ CATALOG_HIVE_CONF_DIR,
+ CATALOG_DEFAULT_DATABASE,
MODULE_NAME));
}
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- return new HashSet<>(Arrays.asList(CATALOG_HIVE_CONF_DIR, CATALOG_DEFAULT_DATABASE));
+ private static InetAddress getHostAddress(String hostName) {
+ try {
+ return InetAddress.getByName(hostName);
+ } catch (UnknownHostException e) {
+ throw new ValidationException(
+ String.format("Can not get the address for the host '%s'.", hostName));
+ }
}
private static void validate(ReadableConfig configuration) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
index cd2787193db..6fc7879ae4a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
@@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtil
import org.apache.flink.table.gateway.api.utils.MockedSqlGatewayService;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -38,6 +41,7 @@ import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOpti
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_NAME;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.MODULE_NAME;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_HOST;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_TIMEOUT;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_MAX_MESSAGE_SIZE;
@@ -70,7 +74,7 @@ class HiveServer2EndpointFactoryTest {
private final String moduleName = "test-module";
@Test
- public void testCreateHiveServer2Endpoint() {
+ public void testCreateHiveServer2Endpoint() throws Exception {
assertThat(
SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
service, Configuration.fromMap(getDefaultConfig())))
@@ -78,6 +82,7 @@ class HiveServer2EndpointFactoryTest {
Collections.singletonList(
new HiveServer2Endpoint(
service,
+ InetAddress.getByName("localhost"),
port,
maxMessageSize,
(int) loginTimeout.toMillis(),
@@ -91,45 +96,47 @@ class HiveServer2EndpointFactoryTest {
moduleName)));
}
- @Test
- public void testCreateHiveServer2EndpointWithIllegalArgument() {
- List<TestSpec<?>> specs =
- Arrays.asList(
- new TestSpec<>(
- THRIFT_WORKER_THREADS_MIN,
- -1,
- "The specified min thrift worker thread number is -1, which should be larger than 0."),
- new TestSpec<>(
- THRIFT_WORKER_THREADS_MAX,
- 0,
- "The specified max thrift worker thread number is 0, which should be larger than 0."),
- new TestSpec<>(
- THRIFT_PORT,
- 1008668001,
- "The specified port is 1008668001, which should range from 0 to 65535."),
- new TestSpec<>(
- THRIFT_LOGIN_TIMEOUT,
- "9223372036854775807 ms",
- "The specified login timeout should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."),
- new TestSpec<>(
- THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
- "9223372036854775807 ms",
- "The specified binary exponential backoff slot time should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."));
-
- for (TestSpec<?> spec : specs) {
- assertThatThrownBy(
- () ->
- SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
- service,
- Configuration.fromMap(
- getModifiedConfig(
- config ->
- setEndpointOption(
- config,
- spec.option,
- spec.value)))))
- .satisfies(FlinkAssertions.anyCauseMatches(spec.exceptionMessage));
- }
+ @ParameterizedTest
+ @MethodSource("getIllegalArgumentTestSpecs")
+ public void testCreateHiveServer2EndpointWithIllegalArgument(TestSpec<?> spec) {
+ assertThatThrownBy(
+ () ->
+ SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
+ service,
+ Configuration.fromMap(
+ getModifiedConfig(
+ config ->
+ setEndpointOption(
+ config,
+ spec.option,
+ spec.value)))))
+ .satisfies(FlinkAssertions.anyCauseMatches(spec.exceptionMessage));
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static List<TestSpec<?>> getIllegalArgumentTestSpecs() {
+ return Arrays.asList(
+ new TestSpec<>(
+ THRIFT_WORKER_THREADS_MIN,
+ -1,
+ "The specified min thrift worker thread number is -1, which should be larger than 0."),
+ new TestSpec<>(
+ THRIFT_WORKER_THREADS_MAX,
+ 0,
+ "The specified max thrift worker thread number is 0, which should be larger than 0."),
+ new TestSpec<>(
+ THRIFT_PORT,
+ 1008668001,
+ "The specified port is 1008668001, which should range from 0 to 65535."),
+ new TestSpec<>(
+ THRIFT_LOGIN_TIMEOUT,
+ "9223372036854775807 ms",
+ "The specified login timeout should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."),
+ new TestSpec<>(
+ THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
+ "9223372036854775807 ms",
+ "The specified binary exponential backoff slot time should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."));
}
// --------------------------------------------------------------------------------------------
@@ -145,6 +152,11 @@ class HiveServer2EndpointFactoryTest {
this.value = value;
this.exceptionMessage = exceptionMessage;
}
+
+ @Override
+ public String toString() {
+ return "TestSpec{option=" + option.key() + '}';
+ }
}
private Map<String, String> getModifiedConfig(Consumer<Map<String, String>> consumer) {
@@ -158,6 +170,7 @@ class HiveServer2EndpointFactoryTest {
config.put(SQL_GATEWAY_ENDPOINT_TYPE.key(), IDENTIFIER);
+ setEndpointOption(config, THRIFT_HOST, "localhost");
setEndpointOption(config, THRIFT_PORT, port);
setEndpointOption(config, THRIFT_WORKER_THREADS_MIN, minWorkerThreads);
setEndpointOption(config, THRIFT_WORKER_THREADS_MAX, maxWorkerThreads);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 7268220cd7c..30951a5765c 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -41,12 +41,16 @@ import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.net.InetAddress;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.assertj.core.api.Assertions.assertThat;
@@ -83,7 +87,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
TOpenSessionReq openSessionReq = new TOpenSessionReq();
Map<String, String> configs = new HashMap<>();
- configs.put(TABLE_DML_SYNC.key(), "true");
+ configs.put(MAX_LENGTH_GENERATED_CODE.key(), "-1");
// simulate to set config using hive jdbc
configs.put("set:hiveconf:key", "value");
// TODO: set hivevar when FLINK-28096 is fixed
@@ -99,6 +103,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
new AbstractMap.SimpleEntry<>(
TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()),
new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"),
+ new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()),
+ new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"),
new AbstractMap.SimpleEntry<>("key", "value"));
}
@@ -121,13 +127,16 @@ public class HiveServer2EndpointITCase extends TestLogger {
private Connection getConnection() throws Exception {
return DriverManager.getConnection(
String.format(
- "jdbc:hive2://localhost:%s/default;auth=noSasl",
- ENDPOINT_EXTENSION.getPort()));
+ "jdbc:hive2://%s:%s/default;auth=noSasl",
+ InetAddress.getLocalHost().getHostAddress(), ENDPOINT_EXTENSION.getPort()));
}
private TCLIService.Client createClient() throws Exception {
TTransport transport =
- HiveAuthUtils.getSocketTransport("localhost", ENDPOINT_EXTENSION.getPort(), 0);
+ HiveAuthUtils.getSocketTransport(
+ InetAddress.getLocalHost().getHostAddress(),
+ ENDPOINT_EXTENSION.getPort(),
+ 0);
transport.open();
return new TCLIService.Client(new TBinaryProtocol(transport));
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
index 8b4957bb8c2..fa6093f4451 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
+import java.net.InetAddress;
import java.util.function.Supplier;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_DEFAULT_DATABASE;
@@ -67,6 +68,7 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll
endpoint =
new HiveServer2Endpoint(
serviceSupplier.get(),
+ InetAddress.getLocalHost(),
port.getPort(),
checkNotNull(endpointConfig.get(THRIFT_MAX_MESSAGE_SIZE)),
(int) endpointConfig.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 2b5f103460a..44eee43fde9 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -57,7 +57,9 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayDeque;
import java.util.Collections;
+import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -154,7 +156,11 @@ public class SessionContext {
}
public void registerModule(String moduleName, Module module) {
+ Deque<String> moduleNames = new ArrayDeque<>(sessionState.moduleManager.listModules());
+ moduleNames.addFirst(moduleName);
+
sessionState.moduleManager.loadModule(moduleName, module);
+ sessionState.moduleManager.useModules(moduleNames.toArray(new String[0]));
}
public void setCurrentCatalog(String catalog) {
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index ad2abd80698..80ec381eb96 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -354,6 +354,8 @@ public class OperationManager {
}
}
+ // -------------------------------------------------------------------------------------------
+
@VisibleForTesting
public int getOperationCount() {
return submittedOperations.size();