You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/07/22 20:25:34 UTC

hive git commit: HIVE-13934 : Configure Tez to make nocondiional task size memory available for the Processor (Wei Zheng, reviewed by Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/master 589ced8f2 -> d246aee62


HIVE-13934 : Configure Tez to make nocondiional task size memory available for the Processor (Wei Zheng, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d246aee6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d246aee6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d246aee6

Branch: refs/heads/master
Commit: d246aee62c8193ca8c94e9506e74227bfd47af9f
Parents: 589ced8
Author: Wei Zheng <we...@apache.org>
Authored: Fri Jul 22 13:27:01 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Fri Jul 22 13:27:01 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  8 ++
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 94 +++++++++++++++++++-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  7 ++
 .../hadoop/hive/ql/parse/TezCompiler.java       |  9 ++
 .../apache/hadoop/hive/ql/plan/BaseWork.java    | 10 +++
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    | 48 ++++++++++
 6 files changed, 175 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c8a8a64..ac921d7 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2688,6 +2688,14 @@ public class HiveConf extends Configuration {
         "hive.tez.exec.inplace.progress",
         true,
         "Updates tez job execution progress in-place in the terminal."),
+    TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,
+        "This is to override the tez setting with the same name"),
+    TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN("hive.tez.task.scale.memory.reserve-fraction.min",
+        0.3f, "This is to override the tez setting tez.task.scale.memory.reserve-fraction"),
+    TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX("hive.tez.task.scale.memory.reserve.fraction.max",
+        0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"),
+    TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction",
+        -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"),
     // The default is different on the client and server, so it's null here.
     LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."),
     LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 60b5c40..eefa1d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -38,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
@@ -454,7 +456,7 @@ public class DagUtils {
    * Falls back to Map-reduces map java opts if no tez specific options
    * are set
    */
-  private String getContainerJavaOpts(Configuration conf) {
+  private static String getContainerJavaOpts(Configuration conf) {
     String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
 
     String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
@@ -1276,4 +1278,94 @@ public class DagUtils {
   private DagUtils() {
     // don't instantiate
   }
+
+  /**
+   * TODO This method is temporary. Ideally Hive should only need to pass to Tez the amount of memory
+   *      it requires to do the map join, and Tez should take care of figuring out how much to allocate
+   * Adjust the percentage of memory to be reserved for the processor from Tez
+   * based on the actual requested memory by the Map Join, i.e. HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD
+   * @return the adjusted percentage
+   */
+  static double adjustMemoryReserveFraction(long memoryRequested, HiveConf conf) {
+    // User specified fraction always takes precedence
+    if (conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION) > 0) {
+      return conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION);
+    }
+
+    float tezHeapFraction = conf.getFloatVar(ConfVars.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION);
+    float tezMinReserveFraction = conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN);
+    float tezMaxReserveFraction = conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX);
+
+    Resource resource = getContainerResource(conf);
+    long containerSize = (long) resource.getMemory() * 1024 * 1024;
+    String javaOpts = getContainerJavaOpts(conf);
+    long xmx = parseRightmostXmx(javaOpts);
+
+    if (xmx <= 0) {
+      xmx = (long) (tezHeapFraction * containerSize);
+    }
+
+    long actualMemToBeAllocated = (long) (tezMinReserveFraction * xmx);
+
+    if (actualMemToBeAllocated < memoryRequested) {
+      LOG.warn("The actual amount of memory to be allocated " + actualMemToBeAllocated +
+          " is less than the amount of requested memory for Map Join conversion " + memoryRequested);
+      float frac = (float) memoryRequested / xmx;
+      LOG.info("Fraction after calculation: " + frac);
+      if (frac <= tezMinReserveFraction) {
+        return tezMinReserveFraction;
+      } else if (frac > tezMinReserveFraction && frac < tezMaxReserveFraction) {
+        LOG.info("Will adjust Tez setting " + TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION +
+            " to " + frac + " to allocate more memory");
+        return frac;
+      } else {  // frac >= tezMaxReserveFraction
+        return tezMaxReserveFraction;
+      }
+    }
+
+    return tezMinReserveFraction;  // the default fraction
+  }
+
+  /**
+   * Parse a Java opts string, try to find the rightmost -Xmx option value (since there may be more than one)
+   * @param javaOpts Java opts string to parse
+   * @return the rightmost -Xmx value in bytes. If Xmx is not set, return -1
+   */
+  static long parseRightmostXmx(String javaOpts) {
+    // Find the last matching -Xmx following word boundaries
+    // Format: -Xmx<size>[g|G|m|M|k|K]
+    Pattern JAVA_OPTS_XMX_PATTERN = Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
+    Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
+
+    if (m.matches()) {
+      long size = Long.parseLong(m.group(1));
+      if (size <= 0) {
+        return -1;
+      }
+
+      if (m.group(2).isEmpty()) {
+        // -Xmx specified in bytes
+        return size;
+      }
+
+      char unit = m.group(2).charAt(0);
+      switch (unit) {
+        case 'k':
+        case 'K':
+          // -Xmx specified in KB
+          return size * 1024;
+        case 'm':
+        case 'M':
+          // -Xmx specified in MB
+          return size * 1024 * 1024;
+        case 'g':
+        case 'G':
+          // -Xmx speficied in GB
+          return size * 1024 * 1024 * 1024;
+      }
+    }
+
+    // -Xmx not specified
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index e4b69a5..25c4514 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -89,6 +89,7 @@ public class TezTask extends Task<TezWork> {
 
   private static final String CLASS_NAME = TezTask.class.getName();
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
+  private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction";
 
   private TezCounters counters;
 
@@ -390,6 +391,12 @@ public class TezTask extends Task<TezWork> {
         Vertex wx =
             utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
                 work, work.getVertexType(w));
+        if (w.getReservedMemoryMB() > 0) {
+          // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed
+          double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);
+          LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac);
+          wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac));
+        } // Otherwise just leave it up to Tez to decide how much memory to allocate
         dag.addVertex(wx);
         utils.addCredentials(w, dag);
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());

http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index cf7a875..66a8322 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -380,6 +380,15 @@ public class TezCompiler extends TaskCompiler {
     GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
 
+    // we need to specify the reserved memory for each work that contains Map Join
+    for (List<BaseWork> baseWorkList : procCtx.mapJoinWorkMap.values()) {
+      for (BaseWork w : baseWorkList) {
+        // work should be the smallest unit for memory allocation
+        w.setReservedMemoryMB(
+            (int)(conf.getLongVar(ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD) / (1024 * 1024)));
+      }
+    }
+
     // we need to clone some operator plans and remove union operators still
     for (BaseWork w: procCtx.workWithUnionOperators) {
       GenTezUtils.removeUnionOperators(procCtx, w);

http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 20f787b..13a0811 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -74,6 +74,8 @@ public abstract class BaseWork extends AbstractOperatorDesc {
   protected boolean llapMode = false;
   protected boolean uberMode = false;
 
+  private int reservedMemoryMB = -1;  // default to -1 means we leave it up to Tez to decide
+
   public void setGatheringStats(boolean gatherStats) {
     this.gatheringStats = gatherStats;
   }
@@ -223,6 +225,14 @@ public abstract class BaseWork extends AbstractOperatorDesc {
     return llapMode;
   }
 
+  public int getReservedMemoryMB() {
+    return reservedMemoryMB;
+  }
+
+  public void setReservedMemoryMB(int memoryMB) {
+    reservedMemoryMB = memoryMB;
+  }
+
   public abstract void configureJobConf(JobConf job);
 
   public void setTag(int tag) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d246aee6/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index e04ad7a..53672a9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -278,4 +278,52 @@ public class TestTezTask {
 
     assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
   }
+
+  @Test
+  public void testParseRightmostXmx() throws Exception {
+    // Empty java opts
+    String javaOpts = "";
+    long heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", -1, heapSize);
+
+    // Non-empty java opts without -Xmx specified
+    javaOpts = "-Xms1024m";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", -1, heapSize);
+
+    // Non-empty java opts with -Xmx specified in GB
+    javaOpts = "-Xms1024m -Xmx2g";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", 2147483648L, heapSize);
+
+    // Non-empty java opts with -Xmx specified in MB
+    javaOpts = "-Xms1024m -Xmx1024m";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", 1073741824, heapSize);
+
+    // Non-empty java opts with -Xmx specified in KB
+    javaOpts = "-Xms1024m -Xmx524288k";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", 536870912, heapSize);
+
+    // Non-empty java opts with -Xmx specified in B
+    javaOpts = "-Xms1024m -Xmx1610612736";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", 1610612736, heapSize);
+
+    // Non-empty java opts with -Xmx specified twice
+    javaOpts = "-Xmx1024m -Xmx1536m";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", 1610612736, heapSize);
+
+    // Non-empty java opts with bad -Xmx specification
+    javaOpts = "pre-Xmx1024m";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", -1, heapSize);
+
+    // Non-empty java opts with bad -Xmx specification
+    javaOpts = "-Xmx1024m-post";
+    heapSize = DagUtils.parseRightmostXmx(javaOpts);
+    assertEquals("Unexpected maximum heap size", -1, heapSize);
+  }
 }