You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/01 01:09:14 UTC
[32/44] hive git commit: HIVE-12222: Define port range in property
for RPCServer (Aihua Xu, reviewed by Xuefu Zhang)
HIVE-12222: Define port range in property for RPCServer (Aihua Xu, reviewed by Xuefu Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e2bd513a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e2bd513a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e2bd513a
Branch: refs/heads/hive-14535
Commit: e2bd513a3970b141576f7ead25fc6cfcc5fcda17
Parents: 667e9dd
Author: Aihua Xu <ai...@apache.org>
Authored: Thu Sep 22 14:20:51 2016 -0400
Committer: Aihua Xu <ai...@apache.org>
Committed: Wed Sep 28 12:07:40 2016 -0400
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 ++
.../hive/spark/client/rpc/RpcConfiguration.java | 38 +++++++++++++++++
.../apache/hive/spark/client/rpc/RpcServer.java | 44 +++++++++++++++++---
.../apache/hive/spark/client/rpc/TestRpc.java | 37 +++++++++++++++-
4 files changed, 115 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 43a16d7..4c3ef3e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3080,6 +3080,9 @@ public class HiveConf extends Configuration {
"Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
"This is only necessary if the host has mutiple network addresses and if a different network address other than " +
"hive.server2.thrift.bind.host is to be used."),
+ SPARK_RPC_SERVER_PORT("hive.spark.client.rpc.server.port", "", "A list of port ranges which can be used by RPC server " +
+ "with the format of 49152-49222,49228 and a random one is selected from the list. Default is empty, which randomly " +
+ "selects one port from all available ones."),
SPARK_DYNAMIC_PARTITION_PRUNING(
"hive.spark.dynamic.partition.pruning", false,
"When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 210f8a4..8c59015 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -18,7 +18,9 @@
package org.apache.hive.spark.client.rpc;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -107,6 +109,42 @@ public final class RpcConfiguration {
return ServerUtils.getHostAddress(hiveHost).getHostName();
}
+ /**
+ * Parses the port string like 49152-49222,49228 into the port list. A default 0
+ * is added for the empty port string.
+ * @return a list of configured ports.
+ * @exception IOException is thrown if the property is not configured properly
+ */
+ List<Integer> getServerPorts() throws IOException {
+ String errMsg = "Incorrect RPC server port configuration for HiveServer2";
+ String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname);
+ ArrayList<Integer> ports = new ArrayList<Integer>();
+ try {
+ if(!StringUtils.isEmpty(portString)) {
+ for (String portRange : portString.split(",")) {
+ String[] range = portRange.split("-");
+ if (range.length == 0 || range.length > 2
+ || (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) {
+ throw new IOException(errMsg);
+ }
+ if (range.length == 1) {
+ ports.add(Integer.valueOf(range[0]));
+ } else {
+ for (int i = Integer.valueOf(range[0]); i <= Integer.valueOf(range[1]); i++) {
+ ports.add(i);
+ }
+ }
+ }
+ } else {
+ ports.add(0);
+ }
+
+ return ports;
+ } catch(NumberFormatException e) {
+ throw new IOException(errMsg);
+ }
+ }
+
String getRpcChannelLogLevel() {
return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 68ee627..657494a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -21,10 +21,13 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.SecureRandom;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
+
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -39,8 +42,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -51,9 +56,9 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
/**
@@ -82,7 +87,7 @@ public class RpcServer implements Closeable {
.setNameFormat("RPC-Handler-%d")
.setDaemon(true)
.build());
- this.channel = new ServerBootstrap()
+ ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@@ -107,16 +112,43 @@ public class RpcServer implements Closeable {
})
.option(ChannelOption.SO_BACKLOG, 1)
.option(ChannelOption.SO_REUSEADDR, true)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .bind(0)
- .sync()
- .channel();
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ this.channel = bindServerPort(serverBootstrap).channel();
this.port = ((InetSocketAddress) channel.localAddress()).getPort();
this.pendingClients = Maps.newConcurrentMap();
this.address = this.config.getServerAddress();
}
/**
+ * Retry the list of configured ports until one is found
+ * @param serverBootstrap
+ * @return
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
+ throws InterruptedException, IOException {
+ List<Integer> ports = config.getServerPorts();
+ if (ports.contains(0)) {
+ return serverBootstrap.bind(0).sync();
+ } else {
+ Random rand = new Random();
+ while(!ports.isEmpty()) {
+ int index = rand.nextInt(ports.size());
+ int port = ports.get(index);
+ ports.remove(index);
+ try {
+ return serverBootstrap.bind(port).sync();
+ } catch(Exception e) {
+ // Retry the next port
+ }
+ }
+ throw new IOException("No available ports from configured RPC Server ports for HiveServer2");
+ }
+ }
+
+ /**
* Tells the RPC server to expect a connection from a new client.
*
* @param clientId An identifier for the client. Must be unique.
http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
index 7bcf1df..77c3d02 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
@@ -19,10 +19,10 @@ package org.apache.hive.spark.client.rpc;
import java.io.Closeable;
import java.net.InetAddress;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -168,6 +168,41 @@ public class TestRpc {
}
@Test
+ public void testServerPort() throws Exception {
+ Map<String, String> config = new HashMap<String, String>();
+
+ RpcServer server0 = new RpcServer(config);
+ assertTrue("Empty port range should return a random valid port: " + server0.getPort(), server0.getPort() >= 0);
+ IOUtils.closeQuietly(server0);
+
+ config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, "49152-49222,49223,49224-49333");
+ RpcServer server1 = new RpcServer(config);
+ assertTrue("Port should be within configured port range:" + server1.getPort(), server1.getPort() >= 49152 && server1.getPort() <= 49333);
+ IOUtils.closeQuietly(server1);
+
+ int expectedPort = 65535;
+ config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, String.valueOf(expectedPort));
+ RpcServer server2 = new RpcServer(config);
+ assertTrue("Port should match configured one: " + server2.getPort(), server2.getPort() == expectedPort);
+ IOUtils.closeQuietly(server2);
+
+ config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, "49552-49222,49223,49224-49333");
+ try {
+ autoClose(new RpcServer(config));
+ assertTrue("Invalid port range should throw an exception", false); // Should not reach here
+ } catch(IOException e) {
+ assertEquals("Incorrect RPC server port configuration for HiveServer2", e.getMessage());
+ }
+
+ // Retry logic
+ expectedPort = 65535;
+ config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, String.valueOf(expectedPort) + ",21-23");
+ RpcServer server3 = new RpcServer(config);
+ assertTrue("Port should match configured one:" + server3.getPort(), server3.getPort() == expectedPort);
+ IOUtils.closeQuietly(server3);
+ }
+
+ @Test
public void testCloseListener() throws Exception {
RpcServer server = autoClose(new RpcServer(emptyConfig));
Rpc[] rpcs = createRpcConnection(server);