You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/24 01:09:01 UTC

[29/50] [abbrv] hive git commit: HIVE-15959 : LLAP: fix headroom calculation and move it to daemon (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-15959 : LLAP: fix headroom calculation and move it to daemon (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/de532b1f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/de532b1f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/de532b1f

Branch: refs/heads/hive-14535
Commit: de532b1f9bb21daa668dac0f2b4f2429c9b4bd37
Parents: af606ff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Feb 21 13:56:17 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Feb 21 13:56:17 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 12 ++--
 .../hadoop/hive/llap/cli/LlapServiceDriver.java | 64 ++++++++------------
 .../hive/llap/daemon/impl/LlapDaemon.java       | 49 +++++++++++----
 llap-server/src/main/resources/package.py       |  8 +--
 .../hive/llap/daemon/MiniLlapCluster.java       |  2 +-
 5 files changed, 72 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1af59ba..4faaa8b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -381,7 +381,7 @@ public class HiveConf extends Configuration {
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_RPC_PORT.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname);
-    llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_XMX_HEADROOM.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname);
@@ -3072,11 +3072,11 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096,
       "The total amount of memory to use for the executors inside LLAP (in megabytes).",
       "llap.daemon.memory.per.instance.mb"),
-    LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.headroom.memory.per.instance.mb", 512,
-      "The total amount of memory deducted from daemon memory required for other LLAP services. The remaining memory" +
-      " will be used by the executors. If the cache is off-heap, Executor memory + Headroom memory = Xmx. If the " +
-        "cache is on-heap, Executor memory + Cache memory + Headroom memory = Xmx. The headroom memory has to be " +
-        "minimum of 5% from the daemon memory."),
+    LLAP_DAEMON_XMX_HEADROOM("hive.llap.daemon.xmx.headroom", "5%",
+      "The total amount of heap memory set aside by LLAP and not used by the executors. Can\n" +
+      "be specified as size (e.g. '512Mb'), or percentage (e.g. '5%'). Note that the latter is\n" +
+      "derived from the total daemon XMX, which can be different from the total executor\n" +
+      "memory if the cache is on-heap; although that's not the default configuration."),
     LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4,
       "The total number of vcpus to use for the executors inside LLAP.",
       "llap.daemon.vcpus.per.instance"),

http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index a93d53a..e8517ab 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.llap.cli;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
@@ -48,17 +47,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
 import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.slider.client.SliderClient;
-import org.apache.slider.common.params.ActionCreateArgs;
-import org.apache.slider.common.params.ActionDestroyArgs;
-import org.apache.slider.common.params.ActionFreezeArgs;
-import org.apache.slider.common.params.ActionInstallPackageArgs;
-import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +76,6 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.eclipse.jetty.server.ssl.SslSocketConnector;
 import org.joda.time.DateTime;
 import org.json.JSONException;
@@ -244,6 +235,7 @@ public class LlapServiceDriver {
         HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
         propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
       }
+      boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
 
       if (options.getSize() != -1) {
         if (options.getCache() != -1) {
@@ -263,8 +255,7 @@ public class LlapServiceDriver {
               + " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize())
               + ")");
         }
-        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
-            && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+        if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
           // direct and not memory mapped
           Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(),
             "Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size ("
@@ -273,19 +264,6 @@ public class LlapServiceDriver {
         }
       }
 
-      // This parameter is read in package.py - and nowhere else. Does not need to be part of
-      // HiveConf - that's just confusing.
-      final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
-      long containerSize = -1;
-      if (options.getSize() != -1) {
-        containerSize = options.getSize() / (1024 * 1024);
-        Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
-            + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater"
-            + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
-        conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
-        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
-            String.valueOf(containerSize));
-      }
 
       if (options.getExecutors() != -1) {
         conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
@@ -319,17 +297,30 @@ public class LlapServiceDriver {
             String.valueOf(xmxMb));
       }
 
-      final long currentHeadRoom = options.getSize() - options.getXmx() - options.getCache();
-      final long minHeadRoom = (long) (options.getXmx() * LlapDaemon.MIN_HEADROOM_PERCENT);
-      final long headRoom = currentHeadRoom < minHeadRoom ? minHeadRoom : currentHeadRoom;
-      final long headRoomMb = headRoom / (1024L * 1024L);
-      conf.setLong(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, headRoomMb);
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname,
-        String.valueOf(headRoomMb));
-
-      LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {} headroom memory: {}",
-        LlapUtil.humanReadableByteCount(options.getSize()), LlapUtil.humanReadableByteCount(options.getXmx()),
-        LlapUtil.humanReadableByteCount(options.getCache()), LlapUtil.humanReadableByteCount(headRoom));
+      long size = options.getSize();
+      if (size == -1) {
+        long heapSize = xmx;
+        if (!isDirect) {
+          heapSize += cache;
+        }
+        size = Math.min((long)(heapSize * 1.2), heapSize + 1024L*1024*1024);
+        if (isDirect) {
+          size += cache;
+        }
+      }
+      long containerSize = size / (1024 * 1024);
+      final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+      Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
+          + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater"
+          + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
+      conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+          String.valueOf(containerSize));
+
+      LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}",
+        LlapUtil.humanReadableByteCount(options.getSize()),
+        LlapUtil.humanReadableByteCount(options.getXmx()),
+        LlapUtil.humanReadableByteCount(options.getCache()));
 
       if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
         conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
@@ -642,9 +633,6 @@ public class LlapServiceDriver {
     configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
         HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
 
-    configs.put(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname,
-      HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB));
-
     configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
         HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
 

http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index e737fdd..fc9f530 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -83,7 +83,6 @@ import com.google.common.primitives.Ints;
 public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class);
-  public static final double MIN_HEADROOM_PERCENT = 0.05;
 
   private final Configuration shuffleHandlerConf;
   private final SecretManager secretManager;
@@ -114,7 +113,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
   public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
     boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
-    int mngPort, int shufflePort, int webPort, String appName, final long headRoomBytes) {
+    int mngPort, int shufflePort, int webPort, String appName) {
     super("LlapDaemon");
 
     printAsciiArt();
@@ -158,11 +157,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
     this.maxJvmMemory = getTotalHeapSize();
     this.llapIoEnabled = ioEnabled;
-    Preconditions.checkArgument(headRoomBytes < executorMemoryBytes, "LLAP daemon headroom size should be less " +
-      "than daemon max memory size. headRoomBytes: " + headRoomBytes + " executorMemoryBytes: " + executorMemoryBytes);
-    final long minHeadRoomBytes = (long) (executorMemoryBytes * MIN_HEADROOM_PERCENT);
-    final long headroom = headRoomBytes < minHeadRoomBytes ? minHeadRoomBytes : headRoomBytes;
-    this.executorMemoryPerInstance = executorMemoryBytes - headroom;
+
+    long xmxHeadRoomBytes = determineXmxHeadroom(daemonConf, executorMemoryBytes, maxJvmMemory);
+    this.executorMemoryPerInstance = executorMemoryBytes - xmxHeadRoomBytes;
     this.ioMemoryPerInstance = ioMemoryBytes;
     this.numExecutors = numExecutors;
     this.localDirs = localDirs;
@@ -173,11 +170,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     boolean enablePreemption = HiveConf.getBoolVar(
         daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
     LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " +
-        "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
+        "maxJvmMemory=" + maxJvmMemory + " ("
+          + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
         ", requestedExecutorMemory=" + executorMemoryBytes +
         " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
-        ", llapIoCacheSize=" + ioMemoryBytes + " (" + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
-        ", headRoomMemory=" + headroom + " (" + LlapUtil.humanReadableByteCount(headroom) + ")" +
+        ", llapIoCacheSize=" + ioMemoryBytes + " ("
+          + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
+        ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
+          + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
         ", adjustedExecutorMemory=" + executorMemoryPerInstance +
         " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
         ", numExecutors=" + numExecutors +
@@ -293,6 +293,30 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     addIfService(amReporter);
   }
 
+  private static long determineXmxHeadroom(
+      Configuration daemonConf, long executorMemoryBytes, long maxJvmMemory) {
+    String headroomStr = HiveConf.getVar(daemonConf, ConfVars.LLAP_DAEMON_XMX_HEADROOM).trim();
+    long xmxHeadRoomBytes = Long.MAX_VALUE;
+    try {
+      if (headroomStr.endsWith("%")) {
+        long percentage = Integer.parseInt(headroomStr.substring(0, headroomStr.length() - 1));
+        Preconditions.checkState(percentage >= 0 && percentage < 100,
+            "Headroom percentage should be in [0, 100) range; found " + headroomStr);
+        xmxHeadRoomBytes = maxJvmMemory * percentage / 100L;
+      } else {
+        xmxHeadRoomBytes = HiveConf.toSizeBytes(headroomStr);
+      }
+    } catch (NumberFormatException ex) {
+      throw new RuntimeException("Invalid headroom configuration " + headroomStr);
+    }
+
+    Preconditions.checkArgument(xmxHeadRoomBytes < executorMemoryBytes,
+        "LLAP daemon headroom size should be less than daemon max memory size. headRoomBytes: "
+          + xmxHeadRoomBytes + " executorMemoryBytes: " + executorMemoryBytes + " (derived from "
+          + headroomStr + " out of xmx of " + maxJvmMemory + ")");
+    return xmxHeadRoomBytes;
+  }
+
   private static void initializeLogging(final Configuration conf) {
     long start = System.currentTimeMillis();
     URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(
@@ -467,15 +491,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
       long executorMemoryBytes = HiveConf.getIntVar(
           daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
-      long headroomBytes = HiveConf.getIntVar(
-        daemonConf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
       long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
       boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
       boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
+
       LlapDaemon.initializeLogging(daemonConf);
       llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
           isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
-          appName, headroomBytes);
+          appName);
 
       LOG.info("Adding shutdown hook for LlapDaemon");
       ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
index 66648b6..8a378ef 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -20,17 +20,15 @@ class LlapResource(object):
 		# convert to Mb
 		self.cache = config["hive.llap.io.memory.size"] / (1024*1024.0)
 		self.direct = config["hive.llap.io.allocator.direct"]
-		self.min_mb = -1
 		self.min_cores = -1
 		# compute heap + cache as final Xmx
 		h = self.memory 
 		if (not self.direct):
 			h += self.cache
 		if size == -1:
-			c = min(h*1.2, h + 1024) # + 1024 or 20%
-			c += (self.direct and self.cache) or 0
-			if self.min_mb > 0:
-				c = c + c%self.min_mb
+			print "Cannot determine the container size"
+			sys.exit(1)
+			return
 		else:
 			# do not mess with user input
 			c = size

http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index a9b23b6..06f6dac 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -164,7 +164,7 @@ public class MiniLlapCluster extends AbstractService {
     LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
     for (int i = 0 ;i < numInstances ; i++) {
       llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
-          ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed, 0);
+          ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed);
       llapDaemons[i].init(new Configuration(conf));
     }
     LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);