You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/19 14:09:44 UTC

[flink] 02/04: [hotfix] Bind to broadcast address when host is not specified

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21aa451df3153064d8f9c7bf53dbfc6809c96788
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Sep 5 14:46:10 2022 +0800

    [hotfix] Bind to broadcast address when host is not specified
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 33 +++++++---------------
 .../hive/HiveServer2EndpointConfigOptions.java     |  2 +-
 .../endpoint/hive/HiveServer2EndpointFactory.java  | 20 ++++++-------
 .../hive/HiveServer2EndpointFactoryTest.java       |  5 ++--
 .../hive/util/HiveServer2EndpointExtension.java    |  4 +--
 5 files changed, 25 insertions(+), 39 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 12ffa946b4f..e3037d3426b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -108,8 +108,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.net.InetAddress;
-import java.net.ServerSocket;
+import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -163,8 +162,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     // --------------------------------------------------------------------------------------------
 
     private final SqlGatewayService service;
-    private final InetAddress hostAddress;
-    private final int port;
+    private final InetSocketAddress socketAddress;
     private final int minWorkerThreads;
     private final int maxWorkerThreads;
     private final Duration workerKeepAliveTime;
@@ -194,8 +192,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
 
     public HiveServer2Endpoint(
             SqlGatewayService service,
-            InetAddress hostAddress,
-            int port,
+            InetSocketAddress socketAddress,
             long maxMessageSize,
             int requestTimeoutMs,
             int backOffSlotLengthMs,
@@ -208,8 +205,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             String moduleName) {
         this(
                 service,
-                hostAddress,
-                port,
+                socketAddress,
                 maxMessageSize,
                 requestTimeoutMs,
                 backOffSlotLengthMs,
@@ -227,8 +223,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @VisibleForTesting
     public HiveServer2Endpoint(
             SqlGatewayService service,
-            InetAddress hostAddress,
-            int port,
+            InetSocketAddress socketAddress,
             long maxMessageSize,
             int requestTimeoutMs,
             int backOffSlotLengthMs,
@@ -243,8 +238,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             boolean isVerbose) {
         this.service = service;
 
-        this.hostAddress = hostAddress;
-        this.port = port;
+        this.socketAddress = socketAddress;
         this.maxMessageSize = maxMessageSize;
         this.requestTimeoutMs = requestTimeoutMs;
         this.backOffSlotLengthMs = backOffSlotLengthMs;
@@ -793,8 +787,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
         }
         HiveServer2Endpoint that = (HiveServer2Endpoint) o;
 
-        return Objects.equals(hostAddress, that.hostAddress)
-                && port == that.port
+        return Objects.equals(socketAddress, that.socketAddress)
                 && minWorkerThreads == that.minWorkerThreads
                 && maxWorkerThreads == that.maxWorkerThreads
                 && requestTimeoutMs == that.requestTimeoutMs
@@ -812,8 +805,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public int hashCode() {
         return Objects.hash(
-                hostAddress,
-                port,
+                socketAddress,
                 minWorkerThreads,
                 maxWorkerThreads,
                 workerKeepAliveTime,
@@ -831,10 +823,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public void run() {
         try {
-            LOG.info(
-                    "HiveServer2 Endpoint begins to listen on {}:{}.",
-                    hostAddress.getHostAddress(),
-                    port);
+            LOG.info("HiveServer2 Endpoint begins to listen on {}.", socketAddress.toString());
             server.serve();
         } catch (Throwable t) {
             LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
@@ -852,9 +841,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
         try {
             server =
                     new TThreadPoolServer(
-                            new TThreadPoolServer.Args(
-                                            new TServerSocket(
-                                                    new ServerSocket(port, -1, hostAddress)))
+                            new TThreadPoolServer.Args(new TServerSocket(socketAddress))
                                     .processorFactory(
                                             new TProcessorFactory(
                                                     new TCLIService.Processor<>(this)))
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
index 483910a01d3..5f8a05dd6f5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java
@@ -38,7 +38,7 @@ public class HiveServer2EndpointConfigOptions {
     public static final ConfigOption<String> THRIFT_HOST =
             ConfigOptions.key("thrift.host")
                     .stringType()
-                    .defaultValue("localhost")
+                    .noDefaultValue()
                     .withDescription(
                             "The server address of HiveServer2 host to be used for communication."
                                     + "Default is empty, which means the to bind to the localhost. "
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
index 876b1f624e1..553e3f909eb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_DEFAULT_DATABASE;
@@ -59,8 +59,7 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
         validate(configuration);
         return new HiveServer2Endpoint(
                 context.getSqlGatewayService(),
-                getHostAddress(configuration.get(THRIFT_HOST)),
-                configuration.get(THRIFT_PORT),
+                getInetSocketAddress(configuration),
                 checkNotNull(configuration.get(THRIFT_MAX_MESSAGE_SIZE)),
                 (int) configuration.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
                 (int) configuration.get(THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH).toMillis(),
@@ -99,12 +98,13 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
                         MODULE_NAME));
     }
 
-    private static InetAddress getHostAddress(String hostName) {
-        try {
-            return InetAddress.getByName(hostName);
-        } catch (UnknownHostException e) {
-            throw new ValidationException(
-                    String.format("Can not get the address for the host '%s'.", hostName), e);
+    private static InetSocketAddress getInetSocketAddress(ReadableConfig configuration) {
+        Optional<String> host = configuration.getOptional(THRIFT_HOST);
+        if (host.isPresent()) {
+            return new InetSocketAddress(
+                    configuration.get(THRIFT_HOST), configuration.get(THRIFT_PORT));
+        } else {
+            return new InetSocketAddress(configuration.get(THRIFT_PORT));
         }
     }
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
index 6fc7879ae4a..bfa1270d546 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -82,8 +82,7 @@ class HiveServer2EndpointFactoryTest {
                         Collections.singletonList(
                                 new HiveServer2Endpoint(
                                         service,
-                                        InetAddress.getByName("localhost"),
-                                        port,
+                                        new InetSocketAddress("localhost", port),
                                         maxMessageSize,
                                         (int) loginTimeout.toMillis(),
                                         (int) backOffSlotLength.toMillis(),
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
index 4cf0143f62b..634f5152f38 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.function.Supplier;
@@ -70,8 +71,7 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll
         endpoint =
                 new HiveServer2Endpoint(
                         serviceSupplier.get(),
-                        InetAddress.getLocalHost(),
-                        port.getPort(),
+                        new InetSocketAddress(port.getPort()),
                         checkNotNull(endpointConfig.get(THRIFT_MAX_MESSAGE_SIZE)),
                         (int) endpointConfig.get(THRIFT_LOGIN_TIMEOUT).toMillis(),
                         (int) endpointConfig.get(THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH).toMillis(),