You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/12/07 20:12:20 UTC

hive git commit: HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)

Repository: hive
Updated Branches:
  refs/heads/spark e4b8cf43c -> 9af0b27bd


HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9af0b27b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9af0b27b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9af0b27b

Branch: refs/heads/spark
Commit: 9af0b27bda6352eb229058db57a25fe65eb81f9a
Parents: e4b8cf4
Author: xzhang <xz...@xzdt>
Authored: Mon Dec 7 11:10:25 2015 -0800
Committer: xzhang <xz...@xzdt>
Committed: Mon Dec 7 11:10:25 2015 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/ServerUtils.java  | 19 +++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  5 ++
 .../service/cli/thrift/ThriftCLIService.java    | 15 +++---
 .../hive/spark/client/rpc/RpcConfiguration.java | 57 +++++++-------------
 4 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index 83517ce..b44f92f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.common;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,4 +50,20 @@ public class ServerUtils {
     }
   }
 
+  /**
+   * Get the Inet address of the machine of the given host name.
+   * @param hostname The name of the host
+   * @return The network address of the the host
+   * @throws UnknownHostException
+   */
+  public static InetAddress getHostAddress(String hostname) throws UnknownHostException {
+    InetAddress serverIPAddress;
+    if (hostname != null && !hostname.isEmpty()) {
+      serverIPAddress = InetAddress.getByName(hostname);
+    } else {
+      serverIPAddress = InetAddress.getLocalHost();
+    }
+    return serverIPAddress;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9e805bd..53ef428 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2387,6 +2387,11 @@ public class HiveConf extends Configuration {
       "Channel logging level for remote Spark driver.  One of {DEBUG, ERROR, INFO, TRACE, WARN}."),
     SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
       "Name of the SASL mechanism to use for authentication."),
+    SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
+      "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + 
+      "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
+      "This is only necessary if the host has mutiple network addresses and if a different network address other than " +
+      "hive.server2.thrift.bind.host is to be used."),
     SPARK_DYNAMIC_PARTITION_PRUNING(
         "hive.spark.dynamic.partition.pruning", false,
         "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 8434965..d54f12c 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.ServiceUtils;
@@ -160,21 +161,19 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
   @Override
   public synchronized void init(HiveConf hiveConf) {
     this.hiveConf = hiveConf;
-    // Initialize common server configs needed in both binary & http modes
-    String portString;
-    hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+
+    String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
     if (hiveHost == null) {
       hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
     }
     try {
-      if (hiveHost != null && !hiveHost.isEmpty()) {
-        serverIPAddress = InetAddress.getByName(hiveHost);
-      } else {
-        serverIPAddress = InetAddress.getLocalHost();
-      }
+      serverIPAddress = ServerUtils.getHostAddress(hiveHost);
     } catch (UnknownHostException e) {
       throw new ServiceException(e);
     }
+
+    // Initialize common server configs needed in both binary & http modes
+    String portString;
     // HTTP mode
     if (HiveServer2.isHTTPTransportMode(hiveConf)) {
       workerKeepAliveTime =

http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 9c8cea0..e387659 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -18,20 +18,19 @@
 package org.apache.hive.spark.client.rpc;
 
 import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+
 import javax.security.sasl.Sasl;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -49,15 +48,14 @@ public final class RpcConfiguration {
     HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
     HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
     HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
-    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname,
+    HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
   );
   public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
     HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
   );
 
-  public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
-
   /** Prefix for other SASL options. */
   public static final String RPC_SASL_OPT_PREFIX = "hive.spark.client.rpc.sasl.";
 
@@ -91,39 +89,22 @@ public final class RpcConfiguration {
     return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
   }
 
+  /**
+   * Here we assume that the remote driver will connect back to HS2 using the same network interface
+   * as if it were just a HS2 client. If this isn't true, we can have a separate configuration for that.
+   * For now, I think we are okay.
+   * @return server host name in the network
+   * @throws IOException
+   */
   String getServerAddress() throws IOException {
-    String value = config.get(SERVER_LISTEN_ADDRESS_KEY);
-    if (value != null) {
-      return value;
-    }
-
-    InetAddress address = InetAddress.getLocalHost();
-    if (address.isLoopbackAddress()) {
-      // Address resolves to something like 127.0.1.1, which happens on Debian;
-      // try to find a better address using the local network interfaces
-      Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
-      while (ifaces.hasMoreElements()) {
-        NetworkInterface ni = ifaces.nextElement();
-        Enumeration<InetAddress> addrs = ni.getInetAddresses();
-        while (addrs.hasMoreElements()) {
-          InetAddress addr = addrs.nextElement();
-          if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
-              && addr instanceof Inet4Address) {
-            // We've found an address that looks reasonable!
-            LOG.warn("Your hostname, {}, resolves to a loopback address; using {} "
-                + " instead (on interface {})", address.getHostName(), addr.getHostAddress(),
-                ni.getName());
-            LOG.warn("Set '{}' if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
-            return addr.getHostAddress();
-          }
-        }
+    String hiveHost = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS);
+    if(StringUtils.isEmpty(hiveHost)) {
+      hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+      if (hiveHost == null) {
+        hiveHost = config.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
       }
     }
-
-    LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find "
-        + " any external IP address!", address.getHostName());
-    LOG.warn("Set {} if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
-    return address.getHostName();
+    return ServerUtils.getHostAddress(hiveHost).getHostName();
   }
 
   String getRpcChannelLogLevel() {