You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/12/07 20:12:20 UTC
hive git commit: HIVE-12568: Provide an option to specify network
interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)
Repository: hive
Updated Branches:
refs/heads/spark e4b8cf43c -> 9af0b27bd
HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9af0b27b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9af0b27b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9af0b27b
Branch: refs/heads/spark
Commit: 9af0b27bda6352eb229058db57a25fe65eb81f9a
Parents: e4b8cf4
Author: xzhang <xz...@xzdt>
Authored: Mon Dec 7 11:10:25 2015 -0800
Committer: xzhang <xz...@xzdt>
Committed: Mon Dec 7 11:10:25 2015 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/common/ServerUtils.java | 19 +++++++
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 ++
.../service/cli/thrift/ThriftCLIService.java | 15 +++---
.../hive/spark/client/rpc/RpcConfiguration.java | 57 +++++++-------------
4 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index 83517ce..b44f92f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.common;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -47,4 +50,20 @@ public class ServerUtils {
}
}
+ /**
+ * Get the Inet address of the machine of the given host name.
+ * @param hostname The name of the host
+ * @return The network address of the the host
+ * @throws UnknownHostException
+ */
+ public static InetAddress getHostAddress(String hostname) throws UnknownHostException {
+ InetAddress serverIPAddress;
+ if (hostname != null && !hostname.isEmpty()) {
+ serverIPAddress = InetAddress.getByName(hostname);
+ } else {
+ serverIPAddress = InetAddress.getLocalHost();
+ }
+ return serverIPAddress;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9e805bd..53ef428 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2387,6 +2387,11 @@ public class HiveConf extends Configuration {
"Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."),
SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
"Name of the SASL mechanism to use for authentication."),
+ SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
+ "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " +
+ "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
+ "This is only necessary if the host has mutiple network addresses and if a different network address other than " +
+ "hive.server2.thrift.bind.host is to be used."),
SPARK_DYNAMIC_PARTITION_PRUNING(
"hive.spark.dynamic.partition.pruning", false,
"When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 8434965..d54f12c 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.ServiceUtils;
@@ -160,21 +161,19 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
- // Initialize common server configs needed in both binary & http modes
- String portString;
- hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+
+ String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
if (hiveHost == null) {
hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
}
try {
- if (hiveHost != null && !hiveHost.isEmpty()) {
- serverIPAddress = InetAddress.getByName(hiveHost);
- } else {
- serverIPAddress = InetAddress.getLocalHost();
- }
+ serverIPAddress = ServerUtils.getHostAddress(hiveHost);
} catch (UnknownHostException e) {
throw new ServiceException(e);
}
+
+ // Initialize common server configs needed in both binary & http modes
+ String portString;
// HTTP mode
if (HiveServer2.isHTTPTransportMode(hiveConf)) {
workerKeepAliveTime =
http://git-wip-us.apache.org/repos/asf/hive/blob/9af0b27b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 9c8cea0..e387659 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -18,20 +18,19 @@
package org.apache.hive.spark.client.rpc;
import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+
import javax.security.sasl.Sasl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,15 +48,14 @@ public final class RpcConfiguration {
HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
- HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+ HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname,
+ HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
);
public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
);
- public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
-
/** Prefix for other SASL options. */
public static final String RPC_SASL_OPT_PREFIX = "hive.spark.client.rpc.sasl.";
@@ -91,39 +89,22 @@ public final class RpcConfiguration {
return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
}
+ /**
+ * Here we assume that the remote driver will connect back to HS2 using the same network interface
+ * as if it were just a HS2 client. If this isn't true, we can have a separate configuration for that.
+ * For now, I think we are okay.
+ * @return server host name in the network
+ * @throws IOException
+ */
String getServerAddress() throws IOException {
- String value = config.get(SERVER_LISTEN_ADDRESS_KEY);
- if (value != null) {
- return value;
- }
-
- InetAddress address = InetAddress.getLocalHost();
- if (address.isLoopbackAddress()) {
- // Address resolves to something like 127.0.1.1, which happens on Debian;
- // try to find a better address using the local network interfaces
- Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
- while (ifaces.hasMoreElements()) {
- NetworkInterface ni = ifaces.nextElement();
- Enumeration<InetAddress> addrs = ni.getInetAddresses();
- while (addrs.hasMoreElements()) {
- InetAddress addr = addrs.nextElement();
- if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
- && addr instanceof Inet4Address) {
- // We've found an address that looks reasonable!
- LOG.warn("Your hostname, {}, resolves to a loopback address; using {} "
- + " instead (on interface {})", address.getHostName(), addr.getHostAddress(),
- ni.getName());
- LOG.warn("Set '{}' if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
- return addr.getHostAddress();
- }
- }
+ String hiveHost = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS);
+ if(StringUtils.isEmpty(hiveHost)) {
+ hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = config.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
}
}
-
- LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find "
- + " any external IP address!", address.getHostName());
- LOG.warn("Set {} if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
- return address.getHostName();
+ return ServerUtils.getHostAddress(hiveHost).getHostName();
}
String getRpcChannelLogLevel() {