You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/13 21:58:47 UTC
hive git commit: HIVE-15347: LLAP: Executor memory and Xmx should
have some headroom for other services (Prasanth Jayachandran reviewed by
Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master d96da70de -> a7665dad9
HIVE-15347: LLAP: Executor memory and Xmx should have some headroom for other services (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7665dad
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7665dad
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7665dad
Branch: refs/heads/master
Commit: a7665dad9609f668b09da204e8a0e25c44792c1d
Parents: d96da70
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue Dec 13 13:58:25 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue Dec 13 13:58:25 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 8 +++-
.../org/apache/hadoop/hive/llap/LlapUtil.java | 10 ++++
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 50 +++++++++++---------
.../hive/llap/daemon/impl/LlapDaemon.java | 43 ++++++++++-------
.../hive/llap/daemon/MiniLlapCluster.java | 2 +-
5 files changed, 72 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a7665dad/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3fd07e2..032ff0c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -380,6 +380,7 @@ public class HiveConf extends Configuration {
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_RPC_PORT.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname);
+ llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname);
@@ -2991,9 +2992,14 @@ public class HiveConf extends Configuration {
"executed in parallel.", "llap.daemon.num.executors"),
LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 0, "The LLAP daemon RPC port.",
"llap.daemon.rpc.port. A value of 0 indicates a dynamic port"),
- LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 3276,
+ LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096,
"The total amount of memory to use for the executors inside LLAP (in megabytes).",
"llap.daemon.memory.per.instance.mb"),
+ LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.headroom.memory.per.instance.mb", 512,
+ "The total amount of memory deducted from daemon memory required for other LLAP services. The remaining memory" +
+ " will be used by the executors. If the cache is off-heap, Executor memory + Headroom memory = Xmx. If the " +
+ "cache is on-heap, Executor memory + Cache memory + Headroom memory = Xmx. The headroom memory has to be " +
+ "minimum of 5% from the daemon memory."),
LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4,
"The total number of vcpus to use for the executors inside LLAP.",
"llap.daemon.vcpus.per.instance"),
http://git-wip-us.apache.org/repos/asf/hive/blob/a7665dad/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 17913f0..aa69752 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -221,4 +221,14 @@ public class LlapUtil {
// getCanonicalHostName would either return FQDN, or an IP.
return (ia == null) ? address.getHostName() : ia.getCanonicalHostName();
}
+
+ public static String humanReadableByteCount(long bytes) {
+ int unit = 1024;
+ if (bytes < unit) {
+ return bytes + "B";
+ }
+ int exp = (int) (Math.log(bytes) / Math.log(unit));
+ String suffix = "KMGTPE".charAt(exp-1) + "";
+ return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7665dad/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index dfd2f7b..169be22 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -43,8 +43,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
@@ -236,8 +238,8 @@ public class LlapServiceDriver {
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
// direct heap allocations need to be safer
Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache size ("
- + humanReadableByteCount(options.getCache()) + ") has to be smaller"
- + " than the container sizing (" + humanReadableByteCount(options.getSize()) + ")");
+ + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller"
+ + " than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) + ")");
} else if (options.getCache() < options.getSize()) {
LOG.warn("Note that this might need YARN physical memory monitoring to be turned off "
+ "(yarn.nodemanager.pmem-check-enabled=false)");
@@ -245,18 +247,17 @@ public class LlapServiceDriver {
}
if (options.getXmx() != -1) {
Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory (Xmx="
- + humanReadableByteCount(options.getXmx()) + ") has to be"
- + " smaller than the container sizing (" + humanReadableByteCount(options.getSize())
+ + LlapUtil.humanReadableByteCount(options.getXmx()) + ") has to be"
+ + " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize())
+ ")");
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
&& false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
// direct and not memory mapped
- Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
- "Working memory + cache (Xmx=" + humanReadableByteCount(options.getXmx())
- + " + cache=" + humanReadableByteCount(options.getCache()) + ")"
- + " has to be smaller than the container sizing ("
- + humanReadableByteCount(options.getSize()) + ")");
+ Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(),
+ "Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size ("
+ + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller than the container sizing ("
+ + LlapUtil.humanReadableByteCount(options.getSize()) + ")");
}
}
@@ -267,8 +268,8 @@ public class LlapServiceDriver {
if (options.getSize() != -1) {
containerSize = options.getSize() / (1024 * 1024);
Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
- + humanReadableByteCount(options.getSize()) + ") should be greater"
- + " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
+ + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater"
+ + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
String.valueOf(containerSize));
@@ -300,12 +301,24 @@ public class LlapServiceDriver {
// Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
// from this, to get actual usable memory before it goes into GC
xmx = options.getXmx();
- long xmxMb = (long) (xmx / (1024 * 1024));
+ long xmxMb = (xmx / (1024L * 1024L));
conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
String.valueOf(xmxMb));
}
+ final long currentHeadRoom = options.getSize() - options.getXmx() - options.getCache();
+ final long minHeadRoom = (long) (options.getXmx() * LlapDaemon.MIN_HEADROOM_PERCENT);
+ final long headRoom = currentHeadRoom < minHeadRoom ? minHeadRoom : currentHeadRoom;
+ final long headRoomMb = headRoom / (1024L * 1024L);
+ conf.setLong(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, headRoomMb);
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname,
+ String.valueOf(headRoomMb));
+
+ LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {} headroom memory: {}",
+ LlapUtil.humanReadableByteCount(options.getSize()), LlapUtil.humanReadableByteCount(options.getXmx()),
+ LlapUtil.humanReadableByteCount(options.getCache()), LlapUtil.humanReadableByteCount(headRoom));
+
if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
@@ -551,6 +564,9 @@ public class LlapServiceDriver {
configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+ configs.put(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB));
+
configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
@@ -699,14 +715,4 @@ public class LlapServiceDriver {
// they will be file:// URLs
lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath);
}
-
- private String humanReadableByteCount(long bytes) {
- int unit = 1024;
- if (bytes < unit) {
- return bytes + "B";
- }
- int exp = (int) (Math.log(bytes) / Math.log(unit));
- String suffix = "KMGTPE".charAt(exp-1) + "";
- return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix);
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a7665dad/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index b7e05d3..789641e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
@@ -84,6 +83,7 @@ import com.google.common.primitives.Ints;
public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean {
private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class);
+ public static final double MIN_HEADROOM_PERCENT = 0.05;
private final Configuration shuffleHandlerConf;
private final SecretManager secretManager;
@@ -113,8 +113,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
private final AtomicReference<Integer> shufflePort = new AtomicReference<>();
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
- boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
- int mngPort, int shufflePort, int webPort, String appName) {
+ boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
+ int mngPort, int shufflePort, int webPort, String appName, final long headRoomBytes) {
super("LlapDaemon");
printAsciiArt();
@@ -158,28 +158,37 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.maxJvmMemory = getTotalHeapSize();
this.llapIoEnabled = ioEnabled;
- this.executorMemoryPerInstance = executorMemoryBytes;
+ Preconditions.checkArgument(headRoomBytes < executorMemoryBytes, "LLAP daemon headroom size should be less " +
+ "than daemon max memory size. headRoomBytes: " + headRoomBytes + " executorMemoryBytes: " + executorMemoryBytes);
+ final long minHeadRoomBytes = (long) (executorMemoryBytes * MIN_HEADROOM_PERCENT);
+ final long headroom = headRoomBytes < minHeadRoomBytes ? minHeadRoomBytes : headRoomBytes;
+ this.executorMemoryPerInstance = executorMemoryBytes - headroom;
this.ioMemoryPerInstance = ioMemoryBytes;
this.numExecutors = numExecutors;
this.localDirs = localDirs;
+
int waitQueueSize = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
boolean enablePreemption = HiveConf.getBoolVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " +
- "numExecutors=" + numExecutors +
+ "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
+ ", requestedExecutorMemory=" + executorMemoryBytes +
+ " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" +
+ ", llapIoCacheSize=" + ioMemoryBytes + " (" + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" +
+ ", headRoomMemory=" + headroom + " (" + LlapUtil.humanReadableByteCount(headroom) + ")" +
+ ", adjustedExecutorMemory=" + executorMemoryPerInstance +
+ " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" +
+ ", numExecutors=" + numExecutors +
+ ", llapIoEnabled=" + ioEnabled +
+ ", llapIoCacheIsDirect=" + isDirectCache +
", rpcListenerPort=" + srvPort +
", mngListenerPort=" + mngPort +
", webPort=" + webPort +
", outputFormatSvcPort=" + outputFormatServicePort +
", workDirs=" + Arrays.toString(localDirs) +
", shufflePort=" + shufflePort +
- ", executorMemory=" + executorMemoryBytes +
- ", llapIoEnabled=" + ioEnabled +
- ", llapIoCacheIsDirect=" + isDirectCache +
- ", llapIoCacheSize=" + ioMemoryBytes +
- ", jvmAvailableMemory=" + maxJvmMemory +
", waitQueueSize= " + waitQueueSize +
", enablePreemption= " + enablePreemption);
@@ -187,9 +196,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0);
// TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver)
Preconditions.checkState(maxJvmMemory >= memRequired,
- "Invalid configuration. Xmx value too small. maxAvailable=" + maxJvmMemory +
- ", configured(exec + io if enabled)=" +
- memRequired);
+ "Invalid configuration. Xmx value too small. maxAvailable=" + LlapUtil.humanReadableByteCount(maxJvmMemory) +
+ ", configured(exec + io if enabled)=" + LlapUtil.humanReadableByteCount(memRequired));
this.shuffleHandlerConf = new Configuration(daemonConf);
this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort);
@@ -238,7 +246,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors,
Ints.toArray(intervalList));
- this.metrics.setMemoryPerInstance(executorMemoryBytes);
+ this.metrics.setMemoryPerInstance(executorMemoryPerInstance);
this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
this.metrics.setJvmMaxMemory(maxJvmMemory);
this.metrics.setWaitQueueSize(waitQueueSize);
@@ -264,7 +272,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
throw new RuntimeException(e);
}
this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
- enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics,
+ enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
amReporter, executorClassLoader, daemonId, fsUgiFactory);
addIfService(containerRunner);
@@ -452,14 +460,15 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
-
+ long headroomBytes = HiveConf.getIntVar(
+ daemonConf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
LlapDaemon.initializeLogging(daemonConf);
llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
- appName);
+ appName, headroomBytes);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/a7665dad/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index e394191..41ce035 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -196,7 +196,7 @@ public class MiniLlapCluster extends AbstractService {
LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
for (int i = 0 ;i < numInstances ; i++) {
llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed);
+ ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed, 0);
llapDaemons[i].init(new Configuration(conf));
}
LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);