You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/09 18:06:40 UTC

hive git commit: HIVE-20648: LLAP: Vector group by operator should use memory per executor

Repository: hive
Updated Branches:
  refs/heads/master db04f3f9a -> 2d2ab674f


HIVE-20648: LLAP: Vector group by operator should use memory per executor


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2d2ab674
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2d2ab674
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2d2ab674

Branch: refs/heads/master
Commit: 2d2ab674f8acb8a4e1d0532790e6c27bd8553018
Parents: db04f3f
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue Oct 9 11:05:55 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue Oct 9 11:05:55 2018 -0700

----------------------------------------------------------------------
 .../ql/exec/vector/VectorGroupByOperator.java   | 24 +++--
 .../exec/vector/TestVectorGroupByOperator.java  | 96 ++++++++++++++++++++
 2 files changed, 112 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2d2ab674/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index a516d60..0d80c9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -33,6 +33,8 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
@@ -148,6 +150,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
 
   private float memoryThreshold;
 
+  private boolean isLlap = false;
   /**
    * Interface for processing mode: global, hash, unsorted streaming, or group batch
    */
@@ -517,7 +520,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
           aggregationBatchInfo.getAggregatorsFixedSize();
 
       MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
-      maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+      maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax();
       memoryThreshold = conf.getMemoryThreshold();
       // Tests may leave this unitialized, so better set it to 1
       if (memoryThreshold == 0.0f) {
@@ -527,13 +530,14 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
       maxHashTblMemory = (int)(maxMemory * memoryThreshold);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
-            maxHashTblMemory/1024/1024,
-            maxMemory/1024/1024,
-            memoryThreshold,
-            fixedHashEntrySize,
-            keyWrappersBatch.getKeysFixedSize(),
-            aggregationBatchInfo.getAggregatorsFixedSize()));
+        LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})",
+          isLlap,
+          LlapUtil.humanReadableByteCount(maxHashTblMemory),
+          LlapUtil.humanReadableByteCount(maxMemory),
+          memoryThreshold,
+          fixedHashEntrySize,
+          keyWrappersBatch.getKeysFixedSize(),
+          aggregationBatchInfo.getAggregatorsFixedSize());
       }
     }
 
@@ -977,6 +981,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
+    isLlap = LlapProxy.isDaemon();
     VectorExpression.doTransientInit(keyExpressions);
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
@@ -1233,4 +1238,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
     }
   }
 
+  public long getMaxMemory() {
+    return maxMemory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2d2ab674/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index fe1375b..278f167 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -38,6 +38,8 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -288,6 +290,8 @@ public class TestVectorGroupByOperator {
     FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
     vgo.initialize(hconf, null);
 
+    long expected = vgo.getMaxMemory();
+    assertEquals(expected, maxMemory);
     this.outputRowCount = 0;
     out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
       @Override
@@ -345,6 +349,98 @@ public class TestVectorGroupByOperator {
   }
 
   @Test
+  public void testMemoryPressureFlushLlap() throws HiveException {
+
+    try {
+      List<String> mapColumnNames = new ArrayList<String>();
+      mapColumnNames.add("Key");
+      mapColumnNames.add("Value");
+      VectorizationContext ctx = new VectorizationContext("name", mapColumnNames);
+
+      Pair<GroupByDesc, VectorGroupByDesc> pair = buildKeyGroupByDesc(ctx, "max",
+        "Value", TypeInfoFactory.longTypeInfo,
+        "Key", TypeInfoFactory.longTypeInfo);
+      GroupByDesc desc = pair.fst;
+      VectorGroupByDesc vectorDesc = pair.snd;
+
+      LlapProxy.setDaemon(true);
+
+      CompilationOpContext cCtx = new CompilationOpContext();
+
+      Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc);
+
+      VectorGroupByOperator vgo =
+        (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc);
+
+      FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+      long maxMemory=512*1024*1024L;
+      vgo.getConf().setMaxMemoryAvailable(maxMemory);
+      float threshold = 100.0f*1024.0f/maxMemory;
+      desc.setMemoryThreshold(threshold);
+      vgo.initialize(hconf, null);
+
+      long got = vgo.getMaxMemory();
+      assertEquals(maxMemory, got);
+      this.outputRowCount = 0;
+      out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
+        @Override
+        public void inspectRow(Object row, int tag) throws HiveException {
+          ++outputRowCount;
+        }
+      });
+
+      Iterable<Object> it = new Iterable<Object>() {
+        @Override
+        public Iterator<Object> iterator() {
+          return new Iterator<Object>() {
+            long value = 0;
+
+            @Override
+            public boolean hasNext() {
+              return true;
+            }
+
+            @Override
+            public Object next() {
+              return ++value;
+            }
+
+            @Override
+            public void remove() {
+            }
+          };
+        }
+      };
+
+      FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables(
+        100,
+        new String[]{"long", "long"},
+        it,
+        it);
+
+      // The 'it' data source will produce data w/o ever ending
+      // We want to see that memory pressure kicks in and some
+      // entries in the VGBY are flushed.
+      long countRowsProduced = 0;
+      for (VectorizedRowBatch unit : data) {
+        countRowsProduced += 100;
+        vgo.process(unit, 0);
+        if (0 < outputRowCount) {
+          break;
+        }
+        // Set an upper bound how much we're willing to push before it should flush
+        // we've set the memory treshold at 100kb, each key is distinct
+        // It should not go beyond 100k/16 (key+data)
+        assertTrue(countRowsProduced < 100 * 1024 / 16);
+      }
+
+      assertTrue(0 < outputRowCount);
+    } finally {
+      LlapProxy.setDaemon(false);
+    }
+  }
+
+  @Test
   public void testMultiKeyIntStringInt() throws HiveException {
     testMultiKey(
         "sum",