You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/04/05 00:37:50 UTC
[06/24] hive git commit: HIVE-13364. Allow llap to work with dynamic
ports for rpc, shuffle, ui. (Siddharth Seth,
reviewed by Prasanth Jayachandran)
HIVE-13364. Allow llap to work with dynamic ports for rpc, shuffle, ui. (Siddharth Seth, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/547c5cfc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/547c5cfc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/547c5cfc
Branch: refs/heads/llap
Commit: 547c5cfce9587de31a58622589a63eba62a4b120
Parents: 184e0e1
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 31 14:54:53 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 31 14:54:53 2016 -0700
----------------------------------------------------------------------
.../impl/LlapZookeeperRegistryImpl.java | 9 +++---
.../hive/llap/daemon/impl/LlapDaemon.java | 34 ++++++++++++++++----
.../daemon/impl/LlapProtocolServerImpl.java | 7 ++--
.../daemon/services/impl/LlapWebServices.java | 13 ++++++--
.../hive/llap/daemon/MiniLlapCluster.java | 4 ++-
5 files changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index c611d1a..ba38fb8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -68,8 +67,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
@@ -285,8 +282,10 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
// No node exists, throw exception
throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
}
- LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: {}", rpcEndpoint,
- znodePath);
+ LOG.info(
+ "Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}," +
+ " webui: {}, mgmt: {}, znodePath: {} ",
+ rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), znodePath);
} catch (Exception e) {
LOG.error("Unable to create a znode for this server instance", e);
CloseableUtils.closeQuietly(znode);
http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index c8734a5..2fe59a2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -100,7 +100,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
- int mngPort, int shufflePort) {
+ int mngPort, int shufflePort, int webPort) {
super("LlapDaemon");
initializeLogging();
@@ -140,6 +140,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
"numExecutors=" + numExecutors +
", rpcListenerPort=" + srvPort +
", mngListenerPort=" + mngPort +
+ ", webPort=" + webPort +
", workDirs=" + Arrays.toString(localDirs) +
", shufflePort=" + shufflePort +
", executorMemory=" + executorMemoryBytes +
@@ -206,12 +207,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
amReporter, executorClassLoader);
addIfService(containerRunner);
- this.registry = new LlapRegistryService(true);
- addIfService(registry);
+
if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)) {
this.webServices = null;
} else {
- this.webServices = new LlapWebServices();
+ this.webServices = new LlapWebServices(webPort);
addIfService(webServices);
}
// Bring up the server only after all other components have started.
@@ -219,6 +219,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
// AMReporter after the server so that it gets the correct address. It knows how to deal with
// requests before it is started.
addIfService(amReporter);
+
+ // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
+ this.registry = new LlapRegistryService(true);
}
private void initializeLogging() {
@@ -289,11 +292,29 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
ShuffleHandler.initializeAndStart(shuffleHandlerConf);
LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort());
this.shufflePort.set(ShuffleHandler.get().getPort());
+ getConfig()
+ .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort());
super.serviceStart();
- LOG.info("LlapDaemon serviceStart complete");
+
+ // Setup the actual ports in the configuration.
+ getConfig().setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, server.getBindAddress().getPort());
+ getConfig().setInt(ConfVars.LLAP_MANAGEMENT_RPC_PORT.varname, server.getManagementBindAddress().getPort());
+ if (webServices != null) {
+ getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort());
+ }
+
+ this.registry.init(getConfig());
+ this.registry.start();
+ LOG.info(
+ "LlapDaemon serviceStart complete. RPC Port={}, ManagementPort={}, ShuflePort={}, WebPort={}",
+ server.getBindAddress().getPort(), server.getManagementBindAddress().getPort(),
+ ShuffleHandler.get().getPort(), (webServices == null ? "" : webServices.getPort()));
}
public void serviceStop() throws Exception {
+ if (registry != null) {
+ this.registry.stop();
+ }
super.serviceStop();
ShuffleHandler.shutdown();
shutdown();
@@ -341,6 +362,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+ int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
@@ -348,7 +370,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
- isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+ isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 3a25a66..e99e689 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -19,7 +19,6 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.protobuf.BlockingService;
@@ -189,11 +188,15 @@ public class LlapProtocolServerImpl extends AbstractService
}
@InterfaceAudience.Private
- @VisibleForTesting
InetSocketAddress getBindAddress() {
return srvAddress.get();
}
+ @InterfaceAudience.Private
+ InetSocketAddress getManagementBindAddress() {
+ return mngAddress.get();
+ }
+
private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
int numHandlers, BlockingService blockingService) throws
IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index afb59c0..e4c622e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.daemon.services.impl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hive.http.HttpServer;
@@ -38,13 +38,14 @@ public class LlapWebServices extends AbstractService {
private boolean useSSL = false;
private boolean useSPNEGO = false;
- public LlapWebServices() {
+ public LlapWebServices(int port) {
super("LlapWebServices");
+ this.port = port;
}
@Override
public void serviceInit(Configuration conf) {
- this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+
this.useSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
this.useSPNEGO = HiveConf.getBoolVar(conf, ConfVars.LLAP_WEB_AUTO_AUTH);
String bindAddress = "0.0.0.0";
@@ -69,6 +70,11 @@ public class LlapWebServices extends AbstractService {
}
}
+ @InterfaceAudience.Private
+ public int getPort() {
+ return this.http.getPort();
+ }
+
@Override
public void serviceStart() throws Exception {
if (this.http != null) {
@@ -76,6 +82,7 @@ public class LlapWebServices extends AbstractService {
}
}
+ @Override
public void serviceStop() throws Exception {
if (this.http != null) {
this.http.stop();
http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index c920c24..a09c0b2 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -146,15 +146,17 @@ public class MiniLlapCluster extends AbstractService {
int rpcPort = 0;
int mngPort = 0;
int shufflePort = 0;
+ int webPort = 0;
boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false);
if (usePortsFromConf) {
rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+ webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
}
llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort);
+ ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
llapDaemon.init(conf);
}