You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/04/05 00:37:50 UTC

[06/24] hive git commit: HIVE-13364. Allow llap to work with dynamic ports for rpc, shuffle, ui. (Siddharth Seth, reviewed by Prasanth Jayachandran)

HIVE-13364. Allow llap to work with dynamic ports for rpc, shuffle, ui. (Siddharth Seth, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/547c5cfc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/547c5cfc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/547c5cfc

Branch: refs/heads/llap
Commit: 547c5cfce9587de31a58622589a63eba62a4b120
Parents: 184e0e1
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 31 14:54:53 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 31 14:54:53 2016 -0700

----------------------------------------------------------------------
 .../impl/LlapZookeeperRegistryImpl.java         |  9 +++---
 .../hive/llap/daemon/impl/LlapDaemon.java       | 34 ++++++++++++++++----
 .../daemon/impl/LlapProtocolServerImpl.java     |  7 ++--
 .../daemon/services/impl/LlapWebServices.java   | 13 ++++++--
 .../hive/llap/daemon/MiniLlapCluster.java       |  4 ++-
 5 files changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index c611d1a..ba38fb8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -68,8 +67,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
@@ -285,8 +282,10 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
         // No node exists, throw exception
         throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
       }
-      LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: {}", rpcEndpoint,
-          znodePath);
+      LOG.info(
+          "Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}," +
+              " webui: {}, mgmt: {}, znodePath: {} ",
+          rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), znodePath);
     } catch (Exception e) {
       LOG.error("Unable to create a znode for this server instance", e);
       CloseableUtils.closeQuietly(znode);

http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index c8734a5..2fe59a2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -100,7 +100,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
   public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
       boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
-      int mngPort, int shufflePort) {
+      int mngPort, int shufflePort, int webPort) {
     super("LlapDaemon");
 
     initializeLogging();
@@ -140,6 +140,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         "numExecutors=" + numExecutors +
         ", rpcListenerPort=" + srvPort +
         ", mngListenerPort=" + mngPort +
+        ", webPort=" + webPort +
         ", workDirs=" + Arrays.toString(localDirs) +
         ", shufflePort=" + shufflePort +
         ", executorMemory=" + executorMemoryBytes +
@@ -206,12 +207,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         amReporter, executorClassLoader);
     addIfService(containerRunner);
 
-    this.registry = new LlapRegistryService(true);
-    addIfService(registry);
+
     if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)) {
       this.webServices = null;
     } else {
-      this.webServices = new LlapWebServices();
+      this.webServices = new LlapWebServices(webPort);
       addIfService(webServices);
     }
     // Bring up the server only after all other components have started.
@@ -219,6 +219,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     // AMReporter after the server so that it gets the correct address. It knows how to deal with
     // requests before it is started.
     addIfService(amReporter);
+
+    // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
+    this.registry = new LlapRegistryService(true);
   }
 
   private void initializeLogging() {
@@ -289,11 +292,29 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     ShuffleHandler.initializeAndStart(shuffleHandlerConf);
     LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort());
     this.shufflePort.set(ShuffleHandler.get().getPort());
+    getConfig()
+        .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort());
     super.serviceStart();
-    LOG.info("LlapDaemon serviceStart complete");
+
+    // Setup the actual ports in the configuration.
+    getConfig().setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, server.getBindAddress().getPort());
+    getConfig().setInt(ConfVars.LLAP_MANAGEMENT_RPC_PORT.varname, server.getManagementBindAddress().getPort());
+    if (webServices != null) {
+      getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort());
+    }
+
+    this.registry.init(getConfig());
+    this.registry.start();
+    LOG.info(
+        "LlapDaemon serviceStart complete. RPC Port={}, ManagementPort={}, ShuflePort={}, WebPort={}",
+        server.getBindAddress().getPort(), server.getManagementBindAddress().getPort(),
+        ShuffleHandler.get().getPort(), (webServices == null ? "" : webServices.getPort()));
   }
 
   public void serviceStop() throws Exception {
+    if (registry != null) {
+      this.registry.stop();
+    }
     super.serviceStop();
     ShuffleHandler.shutdown();
     shutdown();
@@ -341,6 +362,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
       int shufflePort = daemonConf
           .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+      int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
       long executorMemoryBytes = HiveConf.getIntVar(
           daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
 
@@ -348,7 +370,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
       boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
       llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
-              isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+              isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort);
 
       LOG.info("Adding shutdown hook for LlapDaemon");
       ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 3a25a66..e99e689 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -19,7 +19,6 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import com.google.protobuf.BlockingService;
@@ -189,11 +188,15 @@ public class LlapProtocolServerImpl extends AbstractService
   }
 
   @InterfaceAudience.Private
-  @VisibleForTesting
   InetSocketAddress getBindAddress() {
     return srvAddress.get();
   }
 
+  @InterfaceAudience.Private
+  InetSocketAddress getManagementBindAddress() {
+    return mngAddress.get();
+  }
+
   private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
                                   int numHandlers, BlockingService blockingService) throws
       IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index afb59c0..e4c622e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.daemon.services.impl;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hive.http.HttpServer;
@@ -38,13 +38,14 @@ public class LlapWebServices extends AbstractService {
   private boolean useSSL = false;
   private boolean useSPNEGO = false;
 
-  public LlapWebServices() {
+  public LlapWebServices(int port) {
     super("LlapWebServices");
+    this.port = port;
   }
 
   @Override
   public void serviceInit(Configuration conf) {
-    this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+
     this.useSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
     this.useSPNEGO = HiveConf.getBoolVar(conf, ConfVars.LLAP_WEB_AUTO_AUTH);
     String bindAddress = "0.0.0.0";
@@ -69,6 +70,11 @@ public class LlapWebServices extends AbstractService {
     }
   }
 
+  @InterfaceAudience.Private
+  public int getPort() {
+    return this.http.getPort();
+  }
+
   @Override
   public void serviceStart() throws Exception {
     if (this.http != null) {
@@ -76,6 +82,7 @@ public class LlapWebServices extends AbstractService {
     }
   }
 
+  @Override
   public void serviceStop() throws Exception {
     if (this.http != null) {
       this.http.stop();

http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index c920c24..a09c0b2 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -146,15 +146,17 @@ public class MiniLlapCluster extends AbstractService {
     int rpcPort = 0;
     int mngPort = 0;
     int shufflePort = 0;
+    int webPort = 0;
     boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false);
     if (usePortsFromConf) {
       rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
       mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
       shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+      webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
     }
 
     llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
-        ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort);
+        ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
     llapDaemon.init(conf);
   }