You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/19 14:09:44 UTC
[flink] 02/04: [hotfix] Bind to broadcast address when host is not specified
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21aa451df3153064d8f9c7bf53dbfc6809c96788
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Sep 5 14:46:10 2022 +0800
[hotfix] Bind to broadcast address when host is not specified
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 33 +++++++---------------
.../hive/HiveServer2EndpointConfigOptions.java | 2 +-
.../endpoint/hive/HiveServer2EndpointFactory.java | 20 ++++++-------
.../hive/HiveServer2EndpointFactoryTest.java | 5 ++--
.../hive/util/HiveServer2EndpointExtension.java | 4 +--
5 files changed, 25 insertions(+), 39 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 12ffa946b4f..e3037d3426b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -108,8 +108,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.net.InetAddress;
-import java.net.ServerSocket;
+import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
@@ -163,8 +162,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
// --------------------------------------------------------------------------------------------
private final SqlGatewayService service;
- private final InetAddress hostAddress;
- private final int port;
+ private final InetSocketAddress socketAddress;
private final int minWorkerThreads;
private final int maxWorkerThreads;
private final Duration workerKeepAliveTime;
@@ -194,8 +192,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
public HiveServer2Endpoint(
SqlGatewayService service,
- InetAddress hostAddress,
- int port,
+ InetSocketAddress socketAddress,
long maxMessageSize,
int requestTimeoutMs,
int backOffSlotLengthMs,
@@ -208,8 +205,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
String moduleName) {
this(
service,
- hostAddress,
- port,
+ socketAddress,
maxMessageSize,
requestTimeoutMs,
backOffSlotLengthMs,
@@ -227,8 +223,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@VisibleForTesting
public HiveServer2Endpoint(
SqlGatewayService service,
- InetAddress hostAddress,
- int port,
+ InetSocketAddress socketAddress,
long maxMessageSize,
int requestTimeoutMs,
int backOffSlotLengthMs,
@@ -243,8 +238,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
boolean isVerbose) {
this.service = service;
- this.hostAddress = hostAddress;
- this.port = port;
+ this.socketAddress = socketAddress;
this.maxMessageSize = maxMessageSize;
this.requestTimeoutMs = requestTimeoutMs;
this.backOffSlotLengthMs = backOffSlotLengthMs;
@@ -793,8 +787,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
}
HiveServer2Endpoint that = (HiveServer2Endpoint) o;
- return Objects.equals(hostAddress, that.hostAddress)
- && port == that.port
+ return Objects.equals(socketAddress, that.socketAddress)
&& minWorkerThreads == that.minWorkerThreads
&& maxWorkerThreads == that.maxWorkerThreads
&& requestTimeoutMs == that.requestTimeoutMs
@@ -812,8 +805,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public int hashCode() {
return Objects.hash(
- hostAddress,
- port,
+ socketAddress,
minWorkerThreads,
maxWorkerThreads,
workerKeepAliveTime,
@@ -831,10 +823,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
@Override
public void run() {
try {
- LOG.info(
- "HiveServer2 Endpoint begins to listen on {}:{}.",
- hostAddress.getHostAddress(),
- port);
+ LOG.info("HiveServer2 Endpoint begins to listen on {}.", socketAddress.toString());
server.serve();
} catch (Throwable t) {
LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
@@ -852,9 +841,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
try {
server =
new TThreadPoolServer(
- new TThreadPoolServer.Args(
- new TServerSocket(
- new ServerSocket(port, -1, hostAddress)))
+ new TThreadPoolServer.Args(new TServerSocket(socketAddress))
.processorFactory(
new TProcessorFactory(
new TCLIService.Processor<>(this)))
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
index 483910a01d3..5f8a05dd6f5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
@@ -38,7 +38,7 @@ public class HiveServer2EndpointConfigOptions {
public static final ConfigOption<String> THRIFT_HOST =
ConfigOptions.key("thrift.host")
.stringType()
- .defaultValue("localhost")
+ .noDefaultValue()
.withDescription(
"The server address of HiveServer2 host to be used for communication."
+ "Default is empty, which means the to bind to the localhost. "
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
index 876b1f624e1..553e3f909eb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_DEFAULT_DATABASE;
@@ -59,8 +59,7 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
validate(configuration);
return new HiveServer2Endpoint(
context.getSqlGatewayService(),
- getHostAddress(configuration.get(THRIFT_HOST)),
- configuration.get(THRIFT_PORT),
+ getInetSocketAddress(configuration),
checkNotNull(configuration.get(THRIFT_MAX_MESSAGE_SIZE)),
(int) configuration.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
(int) configuration.get(THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH).toMillis(),
@@ -99,12 +98,13 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
MODULE_NAME));
}
- private static InetAddress getHostAddress(String hostName) {
- try {
- return InetAddress.getByName(hostName);
- } catch (UnknownHostException e) {
- throw new ValidationException(
- String.format("Can not get the address for the host '%s'.", hostName), e);
+ private static InetSocketAddress getInetSocketAddress(ReadableConfig configuration) {
+ Optional<String> host = configuration.getOptional(THRIFT_HOST);
+ if (host.isPresent()) {
+ return new InetSocketAddress(
+ configuration.get(THRIFT_HOST), configuration.get(THRIFT_PORT));
+ } else {
+ return new InetSocketAddress(configuration.get(THRIFT_PORT));
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
index 6fc7879ae4a..bfa1270d546 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -82,8 +82,7 @@ class HiveServer2EndpointFactoryTest {
Collections.singletonList(
new HiveServer2Endpoint(
service,
- InetAddress.getByName("localhost"),
- port,
+ new InetSocketAddress("localhost", port),
maxMessageSize,
(int) loginTimeout.toMillis(),
(int) backOffSlotLength.toMillis(),
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
index 4cf0143f62b..634f5152f38 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.function.Supplier;
@@ -70,8 +71,7 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll
endpoint =
new HiveServer2Endpoint(
serviceSupplier.get(),
- InetAddress.getLocalHost(),
- port.getPort(),
+ new InetSocketAddress(port.getPort()),
checkNotNull(endpointConfig.get(THRIFT_MAX_MESSAGE_SIZE)),
(int) endpointConfig.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
(int) endpointConfig.get(THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH).toMillis(),