You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/24 01:09:01 UTC
[29/50] [abbrv] hive git commit: HIVE-15959 : LLAP: fix headroom
calculation and move it to daemon (Sergey Shelukhin,
reviewed by Siddharth Seth)
HIVE-15959 : LLAP: fix headroom calculation and move it to daemon (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/de532b1f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/de532b1f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/de532b1f
Branch: refs/heads/hive-14535
Commit: de532b1f9bb21daa668dac0f2b4f2429c9b4bd37
Parents: af606ff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Feb 21 13:56:17 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Feb 21 13:56:17 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 12 ++--
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 64 ++++++++------------
.../hive/llap/daemon/impl/LlapDaemon.java | 49 +++++++++++----
llap-server/src/main/resources/package.py | 8 +--
.../hive/llap/daemon/MiniLlapCluster.java | 2 +-
5 files changed, 72 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1af59ba..4faaa8b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -381,7 +381,7 @@ public class HiveConf extends Configuration {
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_RPC_PORT.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname);
- llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname);
+ llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_XMX_HEADROOM.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname);
@@ -3072,11 +3072,11 @@ public class HiveConf extends Configuration {
LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096,
"The total amount of memory to use for the executors inside LLAP (in megabytes).",
"llap.daemon.memory.per.instance.mb"),
- LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.headroom.memory.per.instance.mb", 512,
- "The total amount of memory deducted from daemon memory required for other LLAP services. The remaining memory" +
- " will be used by the executors. If the cache is off-heap, Executor memory + Headroom memory = Xmx. If the " +
- "cache is on-heap, Executor memory + Cache memory + Headroom memory = Xmx. The headroom memory has to be " +
- "minimum of 5% from the daemon memory."),
+ LLAP_DAEMON_XMX_HEADROOM("hive.llap.daemon.xmx.headroom", "5%",
+ "The total amount of heap memory set aside by LLAP and not used by the executors. Can\n" +
+ "be specified as size (e.g. '512Mb'), or percentage (e.g. '5%'). Note that the latter is\n" +
+ "derived from the total daemon XMX, which can be different from the total executor\n" +
+ "memory if the cache is on-heap; although that's not the default configuration."),
LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4,
"The total number of vcpus to use for the executors inside LLAP.",
"llap.daemon.vcpus.per.instance"),
http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index a93d53a..e8517ab 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.llap.cli;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
@@ -48,17 +47,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.slider.client.SliderClient;
-import org.apache.slider.common.params.ActionCreateArgs;
-import org.apache.slider.common.params.ActionDestroyArgs;
-import org.apache.slider.common.params.ActionFreezeArgs;
-import org.apache.slider.common.params.ActionInstallPackageArgs;
-import org.apache.slider.core.exceptions.UnknownApplicationInstanceException;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +76,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.eclipse.jetty.server.ssl.SslSocketConnector;
import org.joda.time.DateTime;
import org.json.JSONException;
@@ -244,6 +235,7 @@ public class LlapServiceDriver {
HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
}
+ boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
if (options.getSize() != -1) {
if (options.getCache() != -1) {
@@ -263,8 +255,7 @@ public class LlapServiceDriver {
+ " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize())
+ ")");
}
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
- && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+ if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
// direct and not memory mapped
Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(),
"Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size ("
@@ -273,19 +264,6 @@ public class LlapServiceDriver {
}
}
- // This parameter is read in package.py - and nowhere else. Does not need to be part of
- // HiveConf - that's just confusing.
- final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
- long containerSize = -1;
- if (options.getSize() != -1) {
- containerSize = options.getSize() / (1024 * 1024);
- Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
- + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater"
- + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
- conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
- String.valueOf(containerSize));
- }
if (options.getExecutors() != -1) {
conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
@@ -319,17 +297,30 @@ public class LlapServiceDriver {
String.valueOf(xmxMb));
}
- final long currentHeadRoom = options.getSize() - options.getXmx() - options.getCache();
- final long minHeadRoom = (long) (options.getXmx() * LlapDaemon.MIN_HEADROOM_PERCENT);
- final long headRoom = currentHeadRoom < minHeadRoom ? minHeadRoom : currentHeadRoom;
- final long headRoomMb = headRoom / (1024L * 1024L);
- conf.setLong(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, headRoomMb);
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname,
- String.valueOf(headRoomMb));
-
- LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {} headroom memory: {}",
- LlapUtil.humanReadableByteCount(options.getSize()), LlapUtil.humanReadableByteCount(options.getXmx()),
- LlapUtil.humanReadableByteCount(options.getCache()), LlapUtil.humanReadableByteCount(headRoom));
+ long size = options.getSize();
+ if (size == -1) {
+ long heapSize = xmx;
+ if (!isDirect) {
+ heapSize += cache;
+ }
+ size = Math.min((long)(heapSize * 1.2), heapSize + 1024L*1024*1024);
+ if (isDirect) {
+ size += cache;
+ }
+ }
+ long containerSize = size / (1024 * 1024);
+ final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+ Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
+ + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater"
+ + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
+ conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+ String.valueOf(containerSize));
+
+ LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}",
+ LlapUtil.humanReadableByteCount(options.getSize()),
+ LlapUtil.humanReadableByteCount(options.getXmx()),
+ LlapUtil.humanReadableByteCount(options.getCache()));
if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
@@ -642,9 +633,6 @@ public class LlapServiceDriver {
configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- configs.put(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname,
- HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB));
-
configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index e737fdd..fc9f530 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -83,7 +83,6 @@ import com.google.common.primitives.Ints;
public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean {
private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class);
- public static final double MIN_HEADROOM_PERCENT = 0.05;
private final Configuration shuffleHandlerConf;
private final SecretManager secretManager;
@@ -114,7 +113,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
- int mngPort, int shufflePort, int webPort, String appName, final long headRoomBytes) {
+ int mngPort, int shufflePort, int webPort, String appName) {
super("LlapDaemon");
printAsciiArt();
@@ -158,11 +157,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.maxJvmMemory = getTotalHeapSize();
this.llapIoEnabled = ioEnabled;
- Preconditions.checkArgument(headRoomBytes < executorMemoryBytes, "LLAP daemon headroom size should be less " +
- "than daemon max memory size. headRoomBytes: " + headRoomBytes + " executorMemoryBytes: " + executorMemoryBytes);
- final long minHeadRoomBytes = (long) (executorMemoryBytes * MIN_HEADROOM_PERCENT);
- final long headroom = headRoomBytes < minHeadRoomBytes ? minHeadRoomBytes : headRoomBytes;
- this.executorMemoryPerInstance = executorMemoryBytes - headroom;
+
+ long xmxHeadRoomBytes = determineXmxHeadroom(daemonConf, executorMemoryBytes, maxJvmMemory);
+ this.executorMemoryPerInstance = executorMemoryBytes - xmxHeadRoomBytes;
this.ioMemoryPerInstance = ioMemoryBytes;
this.numExecutors = numExecutors;
this.localDirs = localDirs;
@@ -173,11 +170,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
boolean enablePreemption = HiveConf.getBoolVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " +
- "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
+ "maxJvmMemory=" + maxJvmMemory + " ("
+ + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
", requestedExecutorMemory=" + executorMemoryBytes +
" (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
- ", llapIoCacheSize=" + ioMemoryBytes + " (" + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
- ", headRoomMemory=" + headroom + " (" + LlapUtil.humanReadableByteCount(headroom) + ")" +
+ ", llapIoCacheSize=" + ioMemoryBytes + " ("
+ + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
+ ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " ("
+ + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" +
", adjustedExecutorMemory=" + executorMemoryPerInstance +
" (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
", numExecutors=" + numExecutors +
@@ -293,6 +293,30 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
addIfService(amReporter);
}
+ private static long determineXmxHeadroom(
+ Configuration daemonConf, long executorMemoryBytes, long maxJvmMemory) {
+ String headroomStr = HiveConf.getVar(daemonConf, ConfVars.LLAP_DAEMON_XMX_HEADROOM).trim();
+ long xmxHeadRoomBytes = Long.MAX_VALUE;
+ try {
+ if (headroomStr.endsWith("%")) {
+ long percentage = Integer.parseInt(headroomStr.substring(0, headroomStr.length() - 1));
+ Preconditions.checkState(percentage >= 0 && percentage < 100,
+ "Headroom percentage should be in [0, 100) range; found " + headroomStr);
+ xmxHeadRoomBytes = maxJvmMemory * percentage / 100L;
+ } else {
+ xmxHeadRoomBytes = HiveConf.toSizeBytes(headroomStr);
+ }
+ } catch (NumberFormatException ex) {
+ throw new RuntimeException("Invalid headroom configuration " + headroomStr);
+ }
+
+ Preconditions.checkArgument(xmxHeadRoomBytes < executorMemoryBytes,
+ "LLAP daemon headroom size should be less than daemon max memory size. headRoomBytes: "
+ + xmxHeadRoomBytes + " executorMemoryBytes: " + executorMemoryBytes + " (derived from "
+ + headroomStr + " out of xmx of " + maxJvmMemory + ")");
+ return xmxHeadRoomBytes;
+ }
+
private static void initializeLogging(final Configuration conf) {
long start = System.currentTimeMillis();
URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(
@@ -467,15 +491,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
- long headroomBytes = HiveConf.getIntVar(
- daemonConf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
+
LlapDaemon.initializeLogging(daemonConf);
llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
- appName, headroomBytes);
+ appName);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
index 66648b6..8a378ef 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -20,17 +20,15 @@ class LlapResource(object):
# convert to Mb
self.cache = config["hive.llap.io.memory.size"] / (1024*1024.0)
self.direct = config["hive.llap.io.allocator.direct"]
- self.min_mb = -1
self.min_cores = -1
# compute heap + cache as final Xmx
h = self.memory
if (not self.direct):
h += self.cache
if size == -1:
- c = min(h*1.2, h + 1024) # + 1024 or 20%
- c += (self.direct and self.cache) or 0
- if self.min_mb > 0:
- c = c + c%self.min_mb
+ print "Cannot determine the container size"
+ sys.exit(1)
+ return
else:
# do not mess with user input
c = size
http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index a9b23b6..06f6dac 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -164,7 +164,7 @@ public class MiniLlapCluster extends AbstractService {
LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
for (int i = 0 ;i < numInstances ; i++) {
llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed, 0);
+ ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed);
llapDaemons[i].init(new Configuration(conf));
}
LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);