You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/24 10:39:34 UTC

hive git commit: HIVE-15503: LLAP: Fix use of Runtime.getRuntime.maxMemory in Hive operators (Prasanth Jayachandran reviewed by Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/master d28cbb4da -> cc9012b42


HIVE-15503: LLAP: Fix use of Runtime.getRuntime.maxMemory in Hive operators (Prasanth Jayachandran reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc9012b4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc9012b4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc9012b4

Branch: refs/heads/master
Commit: cc9012b4206b551a209e421df695f051ae5bfb54
Parents: d28cbb4
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sat Dec 24 02:39:22 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Sat Dec 24 02:39:22 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/GroupByOperator.java    | 35 ++++++++++++++------
 .../hadoop/hive/ql/exec/OperatorUtils.java      | 16 +++++++++
 .../apache/hadoop/hive/ql/exec/PTFTopNHash.java | 13 ++++++--
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java |  2 +-
 .../apache/hadoop/hive/ql/exec/TopNHash.java    | 33 ++++++++++++------
 .../hive/ql/exec/tez/MapRecordProcessor.java    | 10 ++++++
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  9 +++++
 .../hive/ql/plan/AbstractOperatorDesc.java      | 11 ++++++
 .../hadoop/hive/ql/plan/OperatorDesc.java       |  2 ++
 9 files changed, 107 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index cddf14f..46f0ecd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
 import javolution.util.FastBitSet;
@@ -146,6 +147,10 @@ public class GroupByOperator extends Operator<GroupByDesc> {
   private transient int countAfterReport;   // report or forward
   private transient int heartbeatInterval;
 
+  private transient boolean isTez;
+  private transient boolean isLlap;
+  private transient int numExecutors;
+
   /**
    * Total amount of memory allowed for JVM heap.
    */
@@ -391,17 +396,20 @@ public class GroupByOperator extends Operator<GroupByDesc> {
       new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors);
 
     newKeys = keyWrapperFactory.getKeyWrapper();
-
+    isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+    isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
+    numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
     firstRow = true;
     // estimate the number of hash table entries based on the size of each
     // entry. Since the size of a entry
     // is not known, estimate that based on the number of entries
     if (hashAggr) {
-      computeMaxEntriesHashAggr(hconf);
+      computeMaxEntriesHashAggr();
     }
     memoryMXBean = ManagementFactory.getMemoryMXBean();
-    maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+    maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
     memoryThreshold = this.getConf().getMemoryThreshold();
+    LOG.info("isTez: {} isLlap: {} numExecutors: {} maxMemory: {}", isTez, isLlap, numExecutors, maxMemory);
   }
 
   /**
@@ -413,9 +421,14 @@ public class GroupByOperator extends Operator<GroupByDesc> {
    * @return number of entries that can fit in hash table - useful for map-side
    *         aggregation only
    **/
-  private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException {
+  private void computeMaxEntriesHashAggr() throws HiveException {
     float memoryPercentage = this.getConf().getGroupByMemoryUsage();
-    maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
+    if (isTez) {
+      maxHashTblMemory = (long) (memoryPercentage * getConf().getMaxMemoryAvailable());
+    } else {
+      maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
+    }
+    LOG.info("Max hash table memory: {} bytes", maxHashTblMemory);
     estimateRowSize();
   }
 
@@ -875,6 +888,9 @@ public class GroupByOperator extends Operator<GroupByDesc> {
     if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
       //check how much memory left memory
       usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
+      // TODO: there is no easy and reliable way to compute the memory used by the executor threads and on-heap cache.
+      // Assuming the used memory is equally divided among all executors.
+      usedMemory = isLlap ? usedMemory / numExecutors : usedMemory;
       rate = (float) usedMemory / (float) maxMemory;
       if(rate > memoryThreshold){
         return true;
@@ -957,7 +973,6 @@ public class GroupByOperator extends Operator<GroupByDesc> {
    * @throws HiveException
    */
   private void flushHashTable(boolean complete) throws HiveException {
-
     countAfterReport = 0;
 
     // Currently, the algorithm flushes 10% of the entries - this can be
@@ -973,7 +988,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
       hashAggregations.clear();
       hashAggregations = null;
       if (isLogInfoEnabled) {
-	LOG.info("Hash Table completed flushed");
+        LOG.info("Hash Table completed flushed");
       }
       return;
     }
@@ -991,9 +1006,9 @@ public class GroupByOperator extends Operator<GroupByDesc> {
       iter.remove();
       numDel++;
       if (numDel * 10 >= oldSize) {
-	if (isLogInfoEnabled) {
-	  LOG.info("Hash Table flushed: new size = " + hashAggregations.size());
-	}
+        if (isLogInfoEnabled) {
+          LOG.info("Hash Table flushed: new size = " + hashAggregations.size());
+        }
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index a8ed74c..d294e25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -330,4 +330,20 @@ public class OperatorUtils {
     }
     return numberOperators;
   }
+
+  public static void setMemoryAvailable(final List<Operator<? extends OperatorDesc>> operators,
+    final long memoryAvailableToTask) {
+    if (operators == null) {
+      return;
+    }
+
+    for (Operator<? extends OperatorDesc> op : operators) {
+      if (op.getConf() != null) {
+        op.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
+      }
+      if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) {
+        setMemoryAvailable(op.getChildOperators(), memoryAvailableToTask);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
index f93b420..2bc83fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
@@ -24,9 +24,11 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.BytesWritable;
 
 
@@ -38,10 +40,15 @@ public class PTFTopNHash extends TopNHash {
   private TopNHash largestPartition;
   private boolean prevIndexPartIsNull;
   private Set<Integer> indexesWithNullPartKey;
+  private OperatorDesc conf;
+  private Configuration hconf;
     
   public void initialize(
-      int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
-    super.initialize(topN, memUsage, isMapGroupBy, collector);
+    int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, final OperatorDesc conf,
+    final Configuration hconf) {
+    super.initialize(topN, memUsage, isMapGroupBy, collector, conf, hconf);
+    this.conf = conf;
+    this.hconf = hconf;
     this.isMapGroupBy = isMapGroupBy;
     this.memUsage = memUsage;
     partitionHeaps = new HashMap<Key, TopNHash>();
@@ -76,7 +83,7 @@ public class PTFTopNHash extends TopNHash {
     TopNHash partHeap = partitionHeaps.get(pk);
     if ( partHeap == null ) {
       partHeap = new TopNHash();
-      partHeap.initialize(topN, memUsage, isMapGroupBy, collector);
+      partHeap.initialize(topN, memUsage, isMapGroupBy, collector, conf, hconf);
       if ( batchIndex >= 0 ) {
         partHeap.startVectorizedBatch(batchSize);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 9f8acc9..789d2a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -244,7 +244,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
 
       if (limit >= 0 && memUsage > 0) {
         reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
-        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, conf, hconf);
       }
 
       useUniformHash = conf.getReducerTraits().contains(UNIFORM);

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
index e400368..f3c7c77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
@@ -19,24 +19,24 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
 
-import com.google.common.collect.MinMaxPriorityQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.MinMaxPriorityQueue;
 
 /**
  * Stores binary key/value in sorted manner to get top-n key/value
@@ -92,7 +92,8 @@ public class TopNHash {
   };
 
   public void initialize(
-      int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
+    int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, final OperatorDesc conf,
+    final Configuration hconf) {
     assert topN >= 0 && memUsage > 0;
     assert !this.isEnabled;
     this.isEnabled = false;
@@ -103,11 +104,23 @@ public class TopNHash {
       return; // topN == 0 will cause a short-circuit, don't need any initialization
     }
 
+    final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+    final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
+    final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
+
     // Used Memory = totalMemory() - freeMemory();
     // Total Free Memory = maxMemory() - Used Memory;
     long totalFreeMemory = Runtime.getRuntime().maxMemory() -
       Runtime.getRuntime().totalMemory() + Runtime.getRuntime().freeMemory();
 
+    if (isTez) {
+      MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+      // TODO: For LLAP, assumption is off-heap cache.
+      final long memoryUsedPerExecutor = (memoryMXBean.getHeapMemoryUsage().getUsed() / numExecutors);
+      // this is total free memory available per executor in case of LLAP
+      totalFreeMemory = conf.getMaxMemoryAvailable() - memoryUsedPerExecutor;
+    }
+
     // limit * 64 : compensation of arrays for key/value/hashcodes
     this.threshold = (long) (memUsage * totalFreeMemory) - topN * 64L;
     if (threshold < 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 6f36dfb..955fa80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -193,6 +194,7 @@ public class MapRecordProcessor extends RecordProcessor {
       } else {
         mapOp = new MapOperator(runtimeCtx);
       }
+
       // Not synchronizing creation of mapOp with an invocation. Check immediately
       // after creation in case abort has been set.
       // Relying on the regular flow to clean up the actual operator. i.e. If an exception is
@@ -283,6 +285,14 @@ public class MapRecordProcessor extends RecordProcessor {
       mapOp.passExecContext(execContext);
       l4j.info(mapOp.dump(0));
 
+      // set memory available for operators
+      long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask();
+      if (mapOp.getConf() != null) {
+        mapOp.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
+        l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask));
+      }
+      OperatorUtils.setMemoryAvailable(mapOp.getChildOperators(), memoryAvailableToTask);
+
       mapOp.initializeLocalWork(jconf);
 
       checkAbortCondition();

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index e4c13fb..d80f201 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -161,6 +162,13 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     reducer = reduceWork.getReducer();
     // Check immediately after reducer is assigned, in cae the abort came in during
     checkAbortCondition();
+    // set memory available for operators
+    long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask();
+    if (reducer.getConf() != null) {
+      reducer.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
+      l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask));
+    }
+    OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask);
     if (numTags > 1) {
       sources = new ReduceRecordSource[numTags];
       mainWorkOIs = new ObjectInspector[numTags];
@@ -199,6 +207,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     checkAbortCondition();
 
     reducer = reduceWork.getReducer();
+
     // initialize reduce operator tree
     try {
       l4j.info(reducer.dump(0));

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
index e217bdf..7df9d07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
@@ -33,6 +33,7 @@ public class AbstractOperatorDesc implements OperatorDesc {
   protected transient OpTraits opTraits;
   protected transient Map<String, String> opProps;
   protected long memNeeded = 0;
+  protected long memAvailable = 0;
   protected String runtimeStatsTmpDir;
 
   @Override
@@ -93,6 +94,16 @@ public class AbstractOperatorDesc implements OperatorDesc {
     this.memNeeded = memNeeded;
   }
 
+  @Override
+  public long getMaxMemoryAvailable() {
+    return memAvailable;
+  }
+
+  @Override
+  public void setMaxMemoryAvailable(final long memoryAvailble) {
+    this.memAvailable = memoryAvailble;
+  }
+
   public String getRuntimeStatsTmpDir() {
     return runtimeStatsTmpDir;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
index ad620c2..850576c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
@@ -30,6 +30,8 @@ public interface OperatorDesc extends Serializable, Cloneable {
   public Map<String, String> getOpProps();
   public long getMemoryNeeded();
   public void setMemoryNeeded(long memoryNeeded);
+  public long getMaxMemoryAvailable();
+  public void setMaxMemoryAvailable(long memoryAvailble);
   public String getRuntimeStatsTmpDir();
   public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir);
 }