You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/07/22 20:25:34 UTC
hive git commit: HIVE-13934 : Configure Tez to make nocondiional task
size memory available for the Processor (Wei Zheng,
reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master 589ced8f2 -> d246aee62
HIVE-13934 : Configure Tez to make nocondiional task size memory available for the Processor (Wei Zheng, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d246aee6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d246aee6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d246aee6
Branch: refs/heads/master
Commit: d246aee62c8193ca8c94e9506e74227bfd47af9f
Parents: 589ced8
Author: Wei Zheng <we...@apache.org>
Authored: Fri Jul 22 13:27:01 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Fri Jul 22 13:27:01 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 8 ++
.../hadoop/hive/ql/exec/tez/DagUtils.java | 94 +++++++++++++++++++-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 ++
.../hadoop/hive/ql/parse/TezCompiler.java | 9 ++
.../apache/hadoop/hive/ql/plan/BaseWork.java | 10 +++
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 48 ++++++++++
6 files changed, 175 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c8a8a64..ac921d7 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2688,6 +2688,14 @@ public class HiveConf extends Configuration {
"hive.tez.exec.inplace.progress",
true,
"Updates tez job execution progress in-place in the terminal."),
+ TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,
+ "This is to override the tez setting with the same name"),
+ TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN("hive.tez.task.scale.memory.reserve-fraction.min",
+ 0.3f, "This is to override the tez setting tez.task.scale.memory.reserve-fraction"),
+ TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX("hive.tez.task.scale.memory.reserve.fraction.max",
+ 0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"),
+ TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction",
+ -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"),
// The default is different on the client and server, so it's null here.
LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."),
LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 60b5c40..eefa1d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -38,6 +38,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
@@ -454,7 +456,7 @@ public class DagUtils {
* Falls back to Map-reduces map java opts if no tez specific options
* are set
*/
- private String getContainerJavaOpts(Configuration conf) {
+ private static String getContainerJavaOpts(Configuration conf) {
String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
@@ -1276,4 +1278,94 @@ public class DagUtils {
private DagUtils() {
// don't instantiate
}
+
+ /**
+ * TODO This method is temporary. Ideally Hive should only need to pass to Tez the amount of memory
+ * it requires to do the map join, and Tez should take care of figuring out how much to allocate
+ * Adjust the percentage of memory to be reserved for the processor from Tez
+ * based on the actual requested memory by the Map Join, i.e. HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD
+ * @return the adjusted percentage
+ */
+ static double adjustMemoryReserveFraction(long memoryRequested, HiveConf conf) {
+ // User specified fraction always takes precedence
+ if (conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION) > 0) {
+ return conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION);
+ }
+
+ float tezHeapFraction = conf.getFloatVar(ConfVars.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION);
+ float tezMinReserveFraction = conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN);
+ float tezMaxReserveFraction = conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX);
+
+ Resource resource = getContainerResource(conf);
+ long containerSize = (long) resource.getMemory() * 1024 * 1024;
+ String javaOpts = getContainerJavaOpts(conf);
+ long xmx = parseRightmostXmx(javaOpts);
+
+ if (xmx <= 0) {
+ xmx = (long) (tezHeapFraction * containerSize);
+ }
+
+ long actualMemToBeAllocated = (long) (tezMinReserveFraction * xmx);
+
+ if (actualMemToBeAllocated < memoryRequested) {
+ LOG.warn("The actual amount of memory to be allocated " + actualMemToBeAllocated +
+ " is less than the amount of requested memory for Map Join conversion " + memoryRequested);
+ float frac = (float) memoryRequested / xmx;
+ LOG.info("Fraction after calculation: " + frac);
+ if (frac <= tezMinReserveFraction) {
+ return tezMinReserveFraction;
+ } else if (frac > tezMinReserveFraction && frac < tezMaxReserveFraction) {
+ LOG.info("Will adjust Tez setting " + TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION +
+ " to " + frac + " to allocate more memory");
+ return frac;
+ } else { // frac >= tezMaxReserveFraction
+ return tezMaxReserveFraction;
+ }
+ }
+
+ return tezMinReserveFraction; // the default fraction
+ }
+
+ /**
+ * Parse a Java opts string, try to find the rightmost -Xmx option value (since there may be more than one)
+ * @param javaOpts Java opts string to parse
+ * @return the rightmost -Xmx value in bytes. If Xmx is not set, return -1
+ */
+ static long parseRightmostXmx(String javaOpts) {
+ // Find the last matching -Xmx following word boundaries
+ // Format: -Xmx<size>[g|G|m|M|k|K]
+ Pattern JAVA_OPTS_XMX_PATTERN = Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
+ Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
+
+ if (m.matches()) {
+ long size = Long.parseLong(m.group(1));
+ if (size <= 0) {
+ return -1;
+ }
+
+ if (m.group(2).isEmpty()) {
+ // -Xmx specified in bytes
+ return size;
+ }
+
+ char unit = m.group(2).charAt(0);
+ switch (unit) {
+ case 'k':
+ case 'K':
+ // -Xmx specified in KB
+ return size * 1024;
+ case 'm':
+ case 'M':
+ // -Xmx specified in MB
+ return size * 1024 * 1024;
+ case 'g':
+ case 'G':
+ // -Xmx speficied in GB
+ return size * 1024 * 1024 * 1024;
+ }
+ }
+
+ // -Xmx not specified
+ return -1;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index e4b69a5..25c4514 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -89,6 +89,7 @@ public class TezTask extends Task<TezWork> {
private static final String CLASS_NAME = TezTask.class.getName();
private final PerfLogger perfLogger = SessionState.getPerfLogger();
+ private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction";
private TezCounters counters;
@@ -390,6 +391,12 @@ public class TezTask extends Task<TezWork> {
Vertex wx =
utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
work, work.getVertexType(w));
+ if (w.getReservedMemoryMB() > 0) {
+ // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed
+ double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);
+ LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac);
+ wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac));
+ } // Otherwise just leave it up to Tez to decide how much memory to allocate
dag.addVertex(wx);
utils.addCredentials(w, dag);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index cf7a875..66a8322 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -380,6 +380,15 @@ public class TezCompiler extends TaskCompiler {
GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
+ // we need to specify the reserved memory for each work that contains Map Join
+ for (List<BaseWork> baseWorkList : procCtx.mapJoinWorkMap.values()) {
+ for (BaseWork w : baseWorkList) {
+ // work should be the smallest unit for memory allocation
+ w.setReservedMemoryMB(
+ (int)(conf.getLongVar(ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD) / (1024 * 1024)));
+ }
+ }
+
// we need to clone some operator plans and remove union operators still
for (BaseWork w: procCtx.workWithUnionOperators) {
GenTezUtils.removeUnionOperators(procCtx, w);
http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 20f787b..13a0811 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -74,6 +74,8 @@ public abstract class BaseWork extends AbstractOperatorDesc {
protected boolean llapMode = false;
protected boolean uberMode = false;
+ private int reservedMemoryMB = -1; // default to -1 means we leave it up to Tez to decide
+
public void setGatheringStats(boolean gatherStats) {
this.gatheringStats = gatherStats;
}
@@ -223,6 +225,14 @@ public abstract class BaseWork extends AbstractOperatorDesc {
return llapMode;
}
+ public int getReservedMemoryMB() {
+ return reservedMemoryMB;
+ }
+
+ public void setReservedMemoryMB(int memoryMB) {
+ reservedMemoryMB = memoryMB;
+ }
+
public abstract void configureJobConf(JobConf job);
public void setTag(int tag) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index e04ad7a..53672a9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -278,4 +278,52 @@ public class TestTezTask {
assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
}
+
+ @Test
+ public void testParseRightmostXmx() throws Exception {
+ // Empty java opts
+ String javaOpts = "";
+ long heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", -1, heapSize);
+
+ // Non-empty java opts without -Xmx specified
+ javaOpts = "-Xms1024m";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", -1, heapSize);
+
+ // Non-empty java opts with -Xmx specified in GB
+ javaOpts = "-Xms1024m -Xmx2g";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", 2147483648L, heapSize);
+
+ // Non-empty java opts with -Xmx specified in MB
+ javaOpts = "-Xms1024m -Xmx1024m";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", 1073741824, heapSize);
+
+ // Non-empty java opts with -Xmx specified in KB
+ javaOpts = "-Xms1024m -Xmx524288k";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", 536870912, heapSize);
+
+ // Non-empty java opts with -Xmx specified in B
+ javaOpts = "-Xms1024m -Xmx1610612736";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", 1610612736, heapSize);
+
+ // Non-empty java opts with -Xmx specified twice
+ javaOpts = "-Xmx1024m -Xmx1536m";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", 1610612736, heapSize);
+
+ // Non-empty java opts with bad -Xmx specification
+ javaOpts = "pre-Xmx1024m";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", -1, heapSize);
+
+ // Non-empty java opts with bad -Xmx specification
+ javaOpts = "-Xmx1024m-post";
+ heapSize = DagUtils.parseRightmostXmx(javaOpts);
+ assertEquals("Unexpected maximum heap size", -1, heapSize);
+ }
}