You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/24 10:39:34 UTC
hive git commit: HIVE-15503: LLAP: Fix use of
Runtime.getRuntime.maxMemory in Hive operators (Prasanth Jayachandran
reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master d28cbb4da -> cc9012b42
HIVE-15503: LLAP: Fix use of Runtime.getRuntime.maxMemory in Hive operators (Prasanth Jayachandran reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc9012b4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc9012b4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc9012b4
Branch: refs/heads/master
Commit: cc9012b4206b551a209e421df695f051ae5bfb54
Parents: d28cbb4
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sat Dec 24 02:39:22 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Sat Dec 24 02:39:22 2016 -0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/GroupByOperator.java | 35 ++++++++++++++------
.../hadoop/hive/ql/exec/OperatorUtils.java | 16 +++++++++
.../apache/hadoop/hive/ql/exec/PTFTopNHash.java | 13 ++++++--
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 2 +-
.../apache/hadoop/hive/ql/exec/TopNHash.java | 33 ++++++++++++------
.../hive/ql/exec/tez/MapRecordProcessor.java | 10 ++++++
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 9 +++++
.../hive/ql/plan/AbstractOperatorDesc.java | 11 ++++++
.../hadoop/hive/ql/plan/OperatorDesc.java | 2 ++
9 files changed, 107 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index cddf14f..46f0ecd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import javolution.util.FastBitSet;
@@ -146,6 +147,10 @@ public class GroupByOperator extends Operator<GroupByDesc> {
private transient int countAfterReport; // report or forward
private transient int heartbeatInterval;
+ private transient boolean isTez;
+ private transient boolean isLlap;
+ private transient int numExecutors;
+
/**
* Total amount of memory allowed for JVM heap.
*/
@@ -391,17 +396,20 @@ public class GroupByOperator extends Operator<GroupByDesc> {
new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors);
newKeys = keyWrapperFactory.getKeyWrapper();
-
+ isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+ isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
+ numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
firstRow = true;
// estimate the number of hash table entries based on the size of each
// entry. Since the size of a entry
// is not known, estimate that based on the number of entries
if (hashAggr) {
- computeMaxEntriesHashAggr(hconf);
+ computeMaxEntriesHashAggr();
}
memoryMXBean = ManagementFactory.getMemoryMXBean();
- maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+ maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
memoryThreshold = this.getConf().getMemoryThreshold();
+ LOG.info("isTez: {} isLlap: {} numExecutors: {} maxMemory: {}", isTez, isLlap, numExecutors, maxMemory);
}
/**
@@ -413,9 +421,14 @@ public class GroupByOperator extends Operator<GroupByDesc> {
* @return number of entries that can fit in hash table - useful for map-side
* aggregation only
**/
- private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException {
+ private void computeMaxEntriesHashAggr() throws HiveException {
float memoryPercentage = this.getConf().getGroupByMemoryUsage();
- maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
+ if (isTez) {
+ maxHashTblMemory = (long) (memoryPercentage * getConf().getMaxMemoryAvailable());
+ } else {
+ maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
+ }
+ LOG.info("Max hash table memory: {} bytes", maxHashTblMemory);
estimateRowSize();
}
@@ -875,6 +888,9 @@ public class GroupByOperator extends Operator<GroupByDesc> {
if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
//check how much memory left memory
usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ // TODO: there is no easy and reliable way to compute the memory used by the executor threads and on-heap cache.
+ // Assuming the used memory is equally divided among all executors.
+ usedMemory = isLlap ? usedMemory / numExecutors : usedMemory;
rate = (float) usedMemory / (float) maxMemory;
if(rate > memoryThreshold){
return true;
@@ -957,7 +973,6 @@ public class GroupByOperator extends Operator<GroupByDesc> {
* @throws HiveException
*/
private void flushHashTable(boolean complete) throws HiveException {
-
countAfterReport = 0;
// Currently, the algorithm flushes 10% of the entries - this can be
@@ -973,7 +988,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
hashAggregations.clear();
hashAggregations = null;
if (isLogInfoEnabled) {
- LOG.info("Hash Table completed flushed");
+ LOG.info("Hash Table completed flushed");
}
return;
}
@@ -991,9 +1006,9 @@ public class GroupByOperator extends Operator<GroupByDesc> {
iter.remove();
numDel++;
if (numDel * 10 >= oldSize) {
- if (isLogInfoEnabled) {
- LOG.info("Hash Table flushed: new size = " + hashAggregations.size());
- }
+ if (isLogInfoEnabled) {
+ LOG.info("Hash Table flushed: new size = " + hashAggregations.size());
+ }
return;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index a8ed74c..d294e25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -330,4 +330,20 @@ public class OperatorUtils {
}
return numberOperators;
}
+
+ public static void setMemoryAvailable(final List<Operator<? extends OperatorDesc>> operators,
+ final long memoryAvailableToTask) {
+ if (operators == null) {
+ return;
+ }
+
+ for (Operator<? extends OperatorDesc> op : operators) {
+ if (op.getConf() != null) {
+ op.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
+ }
+ if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) {
+ setMemoryAvailable(op.getChildOperators(), memoryAvailableToTask);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
index f93b420..2bc83fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
@@ -24,9 +24,11 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.BytesWritable;
@@ -38,10 +40,15 @@ public class PTFTopNHash extends TopNHash {
private TopNHash largestPartition;
private boolean prevIndexPartIsNull;
private Set<Integer> indexesWithNullPartKey;
+ private OperatorDesc conf;
+ private Configuration hconf;
public void initialize(
- int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
- super.initialize(topN, memUsage, isMapGroupBy, collector);
+ int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, final OperatorDesc conf,
+ final Configuration hconf) {
+ super.initialize(topN, memUsage, isMapGroupBy, collector, conf, hconf);
+ this.conf = conf;
+ this.hconf = hconf;
this.isMapGroupBy = isMapGroupBy;
this.memUsage = memUsage;
partitionHeaps = new HashMap<Key, TopNHash>();
@@ -76,7 +83,7 @@ public class PTFTopNHash extends TopNHash {
TopNHash partHeap = partitionHeaps.get(pk);
if ( partHeap == null ) {
partHeap = new TopNHash();
- partHeap.initialize(topN, memUsage, isMapGroupBy, collector);
+ partHeap.initialize(topN, memUsage, isMapGroupBy, collector, conf, hconf);
if ( batchIndex >= 0 ) {
partHeap.startVectorizedBatch(batchSize);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 9f8acc9..789d2a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -244,7 +244,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
if (limit >= 0 && memUsage > 0) {
reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
- reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+ reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, conf, hconf);
}
useUniformHash = conf.getReducerTraits().contains(UNIFORM);
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
index e400368..f3c7c77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
@@ -19,24 +19,24 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
import java.util.TreeMap;
-import java.util.TreeSet;
-import com.google.common.collect.MinMaxPriorityQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.MinMaxPriorityQueue;
/**
* Stores binary key/value in sorted manner to get top-n key/value
@@ -92,7 +92,8 @@ public class TopNHash {
};
public void initialize(
- int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
+ int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, final OperatorDesc conf,
+ final Configuration hconf) {
assert topN >= 0 && memUsage > 0;
assert !this.isEnabled;
this.isEnabled = false;
@@ -103,11 +104,23 @@ public class TopNHash {
return; // topN == 0 will cause a short-circuit, don't need any initialization
}
+ final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+ final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
+ final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
+
// Used Memory = totalMemory() - freeMemory();
// Total Free Memory = maxMemory() - Used Memory;
long totalFreeMemory = Runtime.getRuntime().maxMemory() -
Runtime.getRuntime().totalMemory() + Runtime.getRuntime().freeMemory();
+ if (isTez) {
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ // TODO: For LLAP, assumption is off-heap cache.
+ final long memoryUsedPerExecutor = (memoryMXBean.getHeapMemoryUsage().getUsed() / numExecutors);
+ // this is total free memory available per executor in case of LLAP
+ totalFreeMemory = conf.getMaxMemoryAvailable() - memoryUsedPerExecutor;
+ }
+
// limit * 64 : compensation of arrays for key/value/hashcodes
this.threshold = (long) (memUsage * totalFreeMemory) - topN * 64L;
if (threshold < 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 6f36dfb..955fa80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -193,6 +194,7 @@ public class MapRecordProcessor extends RecordProcessor {
} else {
mapOp = new MapOperator(runtimeCtx);
}
+
// Not synchronizing creation of mapOp with an invocation. Check immediately
// after creation in case abort has been set.
// Relying on the regular flow to clean up the actual operator. i.e. If an exception is
@@ -283,6 +285,14 @@ public class MapRecordProcessor extends RecordProcessor {
mapOp.passExecContext(execContext);
l4j.info(mapOp.dump(0));
+ // set memory available for operators
+ long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask();
+ if (mapOp.getConf() != null) {
+ mapOp.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
+ l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask));
+ }
+ OperatorUtils.setMemoryAvailable(mapOp.getChildOperators(), memoryAvailableToTask);
+
mapOp.initializeLocalWork(jconf);
checkAbortCondition();
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index e4c13fb..d80f201 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -161,6 +162,13 @@ public class ReduceRecordProcessor extends RecordProcessor{
reducer = reduceWork.getReducer();
// Check immediately after reducer is assigned, in cae the abort came in during
checkAbortCondition();
+ // set memory available for operators
+ long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask();
+ if (reducer.getConf() != null) {
+ reducer.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
+ l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask));
+ }
+ OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask);
if (numTags > 1) {
sources = new ReduceRecordSource[numTags];
mainWorkOIs = new ObjectInspector[numTags];
@@ -199,6 +207,7 @@ public class ReduceRecordProcessor extends RecordProcessor{
checkAbortCondition();
reducer = reduceWork.getReducer();
+
// initialize reduce operator tree
try {
l4j.info(reducer.dump(0));
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
index e217bdf..7df9d07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
@@ -33,6 +33,7 @@ public class AbstractOperatorDesc implements OperatorDesc {
protected transient OpTraits opTraits;
protected transient Map<String, String> opProps;
protected long memNeeded = 0;
+ protected long memAvailable = 0;
protected String runtimeStatsTmpDir;
@Override
@@ -93,6 +94,16 @@ public class AbstractOperatorDesc implements OperatorDesc {
this.memNeeded = memNeeded;
}
+ @Override
+ public long getMaxMemoryAvailable() {
+ return memAvailable;
+ }
+
+ @Override
+ public void setMaxMemoryAvailable(final long memoryAvailble) {
+ this.memAvailable = memoryAvailble;
+ }
+
public String getRuntimeStatsTmpDir() {
return runtimeStatsTmpDir;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc9012b4/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
index ad620c2..850576c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
@@ -30,6 +30,8 @@ public interface OperatorDesc extends Serializable, Cloneable {
public Map<String, String> getOpProps();
public long getMemoryNeeded();
public void setMemoryNeeded(long memoryNeeded);
+ public long getMaxMemoryAvailable();
+ public void setMaxMemoryAvailable(long memoryAvailble);
public String getRuntimeStatsTmpDir();
public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir);
}