You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/09 18:06:40 UTC
hive git commit: HIVE-20648: LLAP: Vector group by operator should
use memory per executor
Repository: hive
Updated Branches:
refs/heads/master db04f3f9a -> 2d2ab674f
HIVE-20648: LLAP: Vector group by operator should use memory per executor
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2d2ab674
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2d2ab674
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2d2ab674
Branch: refs/heads/master
Commit: 2d2ab674f8acb8a4e1d0532790e6c27bd8553018
Parents: db04f3f
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue Oct 9 11:05:55 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue Oct 9 11:05:55 2018 -0700
----------------------------------------------------------------------
.../ql/exec/vector/VectorGroupByOperator.java | 24 +++--
.../exec/vector/TestVectorGroupByOperator.java | 96 ++++++++++++++++++++
2 files changed, 112 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2d2ab674/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index a516d60..0d80c9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -33,6 +33,8 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
@@ -148,6 +150,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
private float memoryThreshold;
+ private boolean isLlap = false;
/**
* Interface for processing mode: global, hash, unsorted streaming, or group batch
*/
@@ -517,7 +520,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
aggregationBatchInfo.getAggregatorsFixedSize();
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
- maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+ maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
memoryThreshold = conf.getMemoryThreshold();
// Tests may leave this unitialized, so better set it to 1
if (memoryThreshold == 0.0f) {
@@ -527,13 +530,14 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
maxHashTblMemory = (int)(maxMemory * memoryThreshold);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
- maxHashTblMemory/1024/1024,
- maxMemory/1024/1024,
- memoryThreshold,
- fixedHashEntrySize,
- keyWrappersBatch.getKeysFixedSize(),
- aggregationBatchInfo.getAggregatorsFixedSize()));
+ LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})",
+ isLlap,
+ LlapUtil.humanReadableByteCount(maxHashTblMemory),
+ LlapUtil.humanReadableByteCount(maxMemory),
+ memoryThreshold,
+ fixedHashEntrySize,
+ keyWrappersBatch.getKeysFixedSize(),
+ aggregationBatchInfo.getAggregatorsFixedSize());
}
}
@@ -977,6 +981,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
+ isLlap = LlapProxy.isDaemon();
VectorExpression.doTransientInit(keyExpressions);
List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
@@ -1233,4 +1238,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
}
}
+ public long getMaxMemory() {
+ return maxMemory;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2d2ab674/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index fe1375b..278f167 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -38,6 +38,8 @@ import java.util.Set;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -288,6 +290,8 @@ public class TestVectorGroupByOperator {
FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
vgo.initialize(hconf, null);
+ long expected = vgo.getMaxMemory();
+ assertEquals(expected, maxMemory);
this.outputRowCount = 0;
out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
@Override
@@ -345,6 +349,98 @@ public class TestVectorGroupByOperator {
}
@Test
+ public void testMemoryPressureFlushLlap() throws HiveException {
+
+ try {
+ List<String> mapColumnNames = new ArrayList<String>();
+ mapColumnNames.add("Key");
+ mapColumnNames.add("Value");
+ VectorizationContext ctx = new VectorizationContext("name", mapColumnNames);
+
+ Pair<GroupByDesc, VectorGroupByDesc> pair = buildKeyGroupByDesc(ctx, "max",
+ "Value", TypeInfoFactory.longTypeInfo,
+ "Key", TypeInfoFactory.longTypeInfo);
+ GroupByDesc desc = pair.fst;
+ VectorGroupByDesc vectorDesc = pair.snd;
+
+ LlapProxy.setDaemon(true);
+
+ CompilationOpContext cCtx = new CompilationOpContext();
+
+ Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc);
+
+ VectorGroupByOperator vgo =
+ (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc);
+
+ FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+ long maxMemory=512*1024*1024L;
+ vgo.getConf().setMaxMemoryAvailable(maxMemory);
+ float threshold = 100.0f*1024.0f/maxMemory;
+ desc.setMemoryThreshold(threshold);
+ vgo.initialize(hconf, null);
+
+ long got = vgo.getMaxMemory();
+ assertEquals(maxMemory, got);
+ this.outputRowCount = 0;
+ out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
+ @Override
+ public void inspectRow(Object row, int tag) throws HiveException {
+ ++outputRowCount;
+ }
+ });
+
+ Iterable<Object> it = new Iterable<Object>() {
+ @Override
+ public Iterator<Object> iterator() {
+ return new Iterator<Object>() {
+ long value = 0;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Object next() {
+ return ++value;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+
+ FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables(
+ 100,
+ new String[]{"long", "long"},
+ it,
+ it);
+
+ // The 'it' data source will produce data w/o ever ending
+ // We want to see that memory pressure kicks in and some
+ // entries in the VGBY are flushed.
+ long countRowsProduced = 0;
+ for (VectorizedRowBatch unit : data) {
+ countRowsProduced += 100;
+ vgo.process(unit, 0);
+ if (0 < outputRowCount) {
+ break;
+ }
+ // Set an upper bound how much we're willing to push before it should flush
+ // we've set the memory treshold at 100kb, each key is distinct
+ // It should not go beyond 100k/16 (key+data)
+ assertTrue(countRowsProduced < 100 * 1024 / 16);
+ }
+
+ assertTrue(0 < outputRowCount);
+ } finally {
+ LlapProxy.setDaemon(false);
+ }
+ }
+
+ @Test
public void testMultiKeyIntStringInt() throws HiveException {
testMultiKey(
"sum",