You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/07/19 22:26:52 UTC

[hive] branch master updated: HIVE-21391: LLAP Pool of column vector buffers can cause memory pressure this fix make memroy estimation more tight (Slim Bouguerra reviewed by Prasanth Jayachandran)

This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea614d  HIVE-21391: LLAP Pool of column vector buffers can cause memory pressure this fix make memroy estimation more tight (Slim Bouguerra reviewed by Prasanth Jayachandran)
5ea614d is described below

commit 5ea614d799b82237e2387b3bcb7b63e7db90a3c9
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Fri Jul 19 15:25:26 2019 -0700

    HIVE-21391: LLAP Pool of column vector buffers can cause memory pressure this fix make memroy estimation more tight (Slim Bouguerra reviewed by Prasanth Jayachandran)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  11 +-
 .../hive/common/util/FixedSizedObjectPool.java     |   7 ++
 .../hive/common/util/TestFixedSizedObjectPool.java |  24 ++++
 .../java/org/apache/hadoop/hive/llap/LlapUtil.java |   4 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java    | 126 +++++++++++++++------
 .../hive/llap/io/decode/EncodedDataConsumer.java   |  24 ++--
 .../io/api/impl/LlapRecordReaderQueueSizeTest.java |  97 ++++++++++++++++
 .../java/org/apache/hadoop/hive/common/Pool.java   |  25 +++-
 8 files changed, 262 insertions(+), 56 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0f34986..f002c6e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4146,13 +4146,16 @@ public class HiveConf extends Configuration {
         "MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."),
     LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
         "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
-    LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000,
-        "The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
+    LLAP_IO_VRB_QUEUE_LIMIT_MAX("hive.llap.io.vrb.queue.limit.max", 50000,
+        "The maximum queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
         "slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" +
-        "from the base, depending on the schema."),
-    LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10,
+        "from the base, depending on the schema see LLAP_IO_CVB_BUFFERED_SIZE."),
+    LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 1,
         "The minimum queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
         "slower than the IO (used when determining the size from base size)."),
+    LLAP_IO_CVB_BUFFERED_SIZE("hive.llap.io.cvb.memory.consumption.", 1L << 30,
+        "The amount of bytes used to buffer CVB between IO and Processor Threads default to 1GB, "
+            + "this will be used to compute a best effort queue size for VRBs produced by a LLAP IO thread."),
     LLAP_IO_SHARE_OBJECT_POOLS("hive.llap.io.share.object.pools", false,
         "Whether to used shared object pools in LLAP IO. A safety flag."),
     LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
diff --git a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
index 3900a45..371d939 100644
--- a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
+++ b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
@@ -154,6 +154,13 @@ public class FixedSizedObjectPool<T> implements Pool<T> {
     return offerImpl(t);
   }
 
+  @Override public void clear() {
+    T result = takeImpl();
+    while (result != null) {
+      result = takeImpl();
+    }
+  }
+
   private T takeImpl() {
     long oldState = reserveArrayIndex(OBJECTS, EMPTY);
     if (oldState == NO_INDEX) return null; // For whatever reason, reserve failed.
diff --git a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
index b026e54..1c3fc07 100644
--- a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
+++ b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
@@ -29,6 +29,7 @@ import java.util.concurrent.FutureTask;
 
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.hadoop.hive.common.Pool;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -238,6 +239,29 @@ public class TestFixedSizedObjectPool {
     assertTrue(OneObjHelper.THE_OBJECT == pool.take());
   }
 
+  @Test
+  public void testClearImp() {
+    int size = 10;
+    FixedSizedObjectPool<Object>
+        fixedSizedObjectPool =
+        new FixedSizedObjectPool<>(size, new Pool.PoolObjectHelper<Object>() {
+          @Override public Object create() {
+            //Null is used as marker to be the end.
+            return null;
+          }
+
+          @Override public void resetBeforeOffer(Object o) {
+            //
+          }
+        });
+    for (int i = 0; i < size; i++) {
+      fixedSizedObjectPool.offer(new Object());
+    }
+    Assert.assertEquals(size, fixedSizedObjectPool.size());
+    assertNotNull(fixedSizedObjectPool.take());
+    fixedSizedObjectPool.clear();
+    assertNull(fixedSizedObjectPool.take());
+  }
   private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
     cdlIn.countDown();
     try {
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 6d7cf7d..a351a19 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.BlockingService;
 
+import javax.annotation.Nullable;
+
 public class LlapUtil {
   private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class);
 
@@ -372,7 +374,7 @@ public class LlapUtil {
   }
 
 
-  public static ThreadMXBean initThreadMxBean() {
+  @Nullable public static ThreadMXBean initThreadMxBean() {
     ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
     if (mxBean != null) {
       if (!mxBean.isCurrentThreadCpuTimeSupported()) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 91c94ef..1378a01 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.hive.llap.io.api.impl;
 import java.util.ArrayList;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -74,11 +75,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-class LlapRecordReader
-    implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
+class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
   private static final Object DONE_OBJECT = new Object();
@@ -91,7 +90,7 @@ class LlapRecordReader
   private VectorizedOrcAcidRowBatchReader acidReader;
   private final Object[] partitionValues;
 
-  private final LinkedBlockingQueue<Object> queue;
+  private final ArrayBlockingQueue<Object> queue;
   private final AtomicReference<Throwable> pendingError = new AtomicReference<>(null);
 
   /** Vector that is currently being processed by our user. */
@@ -160,14 +159,22 @@ class LlapRecordReader
     TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
         job, isAcidScan, Integer.MAX_VALUE);
 
-
-    int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf);
-    int queueLimitMin =  getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
-    final boolean decimal64Support = HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED)
-      .equalsIgnoreCase("decimal_64");
-    int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos(), decimal64Support);
+    int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MAX, job, daemonConf);
+    int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
+    long bestEffortSize = getLongQueueVar(ConfVars.LLAP_IO_CVB_BUFFERED_SIZE, job, daemonConf);
+
+    final boolean
+        decimal64Support =
+        HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64");
+    int
+        limit =
+        determineQueueLimit(bestEffortSize,
+            queueLimitBase,
+            queueLimitMin,
+            rbCtx.getRowColumnTypeInfos(),
+            decimal64Support);
     LOG.info("Queue limit for LlapRecordReader is " + limit);
-    this.queue = new LinkedBlockingQueue<>(limit);
+    this.queue = new ArrayBlockingQueue<>(limit);
 
 
     int partitionColumnCount = rbCtx.getPartitionColumnCount();
@@ -197,24 +204,64 @@ class LlapRecordReader
     return (jobVal != -1) ? jobVal : HiveConf.getIntVar(daemonConf, var);
   }
 
+  private static long getLongQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) {
+    // Check job config for overrides, otherwise use the default server value.
+    long jobVal = jobConf.getLong(var.varname, -1);
+    return (jobVal != -1) ? jobVal : HiveConf.getLongVar(daemonConf, var);
+  }
+
   // For queue size estimation purposes, we assume all columns have weight one, and the following
   // types are counted as multiple columns. This is very primitive; if we wanted to make it better,
   // we'd increase the base limit, and adjust dynamically based on IO and processing perf delays.
-  private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
+  private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 10,
       COL_WEIGHT_STRING = 8;
-  private static int determineQueueLimit(
-    int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final boolean decimal64Support) {
+
+  @VisibleForTesting
+  static int determineQueueLimit(long maxBufferedSize,
+      int queueLimitMax,
+      int queueLimitMin,
+      TypeInfo[] typeInfos,
+      final boolean decimal64Support) {
+    assert queueLimitMax >= queueLimitMin;
     // If the values are equal, the queue limit is fixed.
-    if (queueLimitBase == queueLimitMin) return queueLimitBase;
+    if (queueLimitMax == queueLimitMin) return queueLimitMax;
     // If there are no columns (projection only join?) just assume no weight.
-    if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
+    if (typeInfos == null || typeInfos.length == 0) return queueLimitMax;
+    // total weight as bytes
     double totalWeight = 0;
-    for (TypeInfo ti : typeInfos) {
+    int numberOfProjectedColumns = typeInfos.length;
+    double scale = Math.max(Math.log(numberOfProjectedColumns), 1);
+
+    // Assuming that an empty Column Vector is about 96 bytes the object
+    // org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector object internals:
+    // OFFSET  SIZE                                                      TYPE DESCRIPTION
+    // VALUE
+    //      0    16                                                           (object header)
+    //     16     1                                                   boolean ColumnVector.noNulls
+    //     17     1                                                   boolean ColumnVector.isRepeating
+    //     18     1                                                   boolean ColumnVector.preFlattenIsRepeating
+    //     19     1                                                   boolean ColumnVector.preFlattenNoNulls
+    //     20     4                                                           (alignment/padding gap)
+    //     24     8   org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type ColumnVector.type
+    //     32     8                                                 boolean[] ColumnVector.isNull
+    //     40     4                                                       int BytesColumnVector.nextFree
+    //     44     4                                                       int BytesColumnVector.smallBufferNextFree
+    //     48     4                                                       int BytesColumnVector.bufferAllocationCount
+    //     52     4                                                           (alignment/padding gap)
+    //     56     8                                                  byte[][] BytesColumnVector.vector
+    //     64     8                                                     int[] BytesColumnVector.start
+    //     72     8                                                     int[] BytesColumnVector.length
+    //     80     8                                                    byte[] BytesColumnVector.buffer
+    //     88     8                                                    byte[] BytesColumnVector.smallBuffer
+    long columnVectorBaseSize = (long) (96 * numberOfProjectedColumns * scale);
+
+    for (int i = 0; i < typeInfos.length; i++) {
+      TypeInfo ti = typeInfos[i];
       int colWeight;
       if (ti.getCategory() != Category.PRIMITIVE) {
         colWeight = COL_WEIGHT_COMPLEX;
       } else {
-        PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+        PrimitiveTypeInfo pti = (PrimitiveTypeInfo) ti;
         switch (pti.getPrimitiveCategory()) {
         case BINARY:
         case CHAR:
@@ -222,6 +269,11 @@ class LlapRecordReader
         case STRING:
           colWeight = COL_WEIGHT_STRING;
           break;
+          //Timestamp column vector uses an int and long arrays
+        case TIMESTAMP:
+        case INTERVAL_DAY_TIME:
+          colWeight = 2;
+          break;
         case DECIMAL:
           boolean useDecimal64 = false;
           if (ti instanceof DecimalTypeInfo) {
@@ -241,9 +293,13 @@ class LlapRecordReader
           colWeight = 1;
         }
       }
-      totalWeight += colWeight;
+      totalWeight += colWeight * 8 * scale;
     }
-    return Math.max(queueLimitMin, (int)(queueLimitBase / totalWeight));
+    //default batch size is 1024
+    totalWeight *= 1024;
+    totalWeight +=  columnVectorBaseSize;
+    int bestEffortSize = Math.min((int) (maxBufferedSize / totalWeight), queueLimitMax);
+    return Math.max(bestEffortSize, queueLimitMin);
   }
 
 
@@ -271,7 +327,7 @@ class LlapRecordReader
       work = Utilities.getMergeWork(job, inputName);
     }
 
-    if (work == null || !(work instanceof MapWork)) {
+    if (!(work instanceof MapWork)) {
       work = Utilities.getMapWork(job);
     }
     return (MapWork) work;
@@ -325,7 +381,7 @@ class LlapRecordReader
       }
       isFirst = false;
     }
-    ColumnVectorBatch cvb = null;
+    ColumnVectorBatch cvb;
     try {
       cvb = nextCvb();
     } catch (InterruptedException e) {
@@ -347,10 +403,10 @@ class LlapRecordReader
         // TODO: relying everywhere on the magical constants and columns being together means ACID
         //       columns are going to be super hard to change in a backward compat manner. I can
         //       foresee someone cursing while refactoring all the magic for prefix schema changes.
-        /**
-         * Acid meta cols are always either all included or all excluded the
-         * the width of 'cvb' changes accordingly so 'acidColCount' and
-         * 'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments.
+        /*
+          Acid meta cols are always either all included or all excluded the
+          the width of 'cvb' changes accordingly so 'acidColCount' and
+          'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments.
          */
         // Exclude the row column.
         int acidColCount = acidReader.includeAcidColumns() ?
@@ -467,7 +523,7 @@ class LlapRecordReader
     // If the structure is replaced with smth that doesn't, we MUST check interrupt here because
     // Hive operators rely on recordreader to handle task interruption, and unlike most RRs we
     // do not do any blocking IO ops on this thread.
-    Object next = null;
+    Object next;
     do {
       rethrowErrorIfAny(pendingError.get()); // Best-effort check; see the comment in the method.
       next = queue.poll(100, TimeUnit.MILLISECONDS);
@@ -624,7 +680,7 @@ class LlapRecordReader
       List<Integer> filePhysicalColumnIds = readerLogicalColumnIds;
       if (isAcidScan) {
         int rootCol = OrcInputFormat.getRootColumn(false);
-        filePhysicalColumnIds = new ArrayList<Integer>(filePhysicalColumnIds.size() + rootCol);
+        filePhysicalColumnIds = new ArrayList<>(filePhysicalColumnIds.size() + rootCol);
         this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This is somewhat fragile...
         // Note: this guarantees that physical column IDs are in order.
         for (int i = 0; i < rootCol; ++i) {
@@ -632,12 +688,12 @@ class LlapRecordReader
           // struct to get read without projection.
           if (acidStructColumnId == i) continue;
           if(!includeAcidColumns) {
-            /**
-             * if not including acid columns, we still want to number the
-             * physical columns as if acid columns are included becase
-             * {@link #generateFileIncludes(TypeDescription)} takes the file
-             * schema as input
-             * (eg <op, owid, writerId, rowid, cwid, <f1, ... fn>>)
+            /*
+              if not including acid columns, we still want to number the
+              physical columns as if acid columns are included becase
+              {@link #generateFileIncludes(TypeDescription)} takes the file
+              schema as input
+              (eg <op, owid, writerId, rowid, cwid, <f1, ... fn>>)
              */
             continue;
           }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 84436bc..10d76aa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -57,17 +57,15 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
     this.downstreamConsumer = consumer;
     this.ioMetrics = ioMetrics;
     this.mxBean = LlapUtil.initThreadMxBean();
-    cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
-        new Pool.PoolObjectHelper<ColumnVectorBatch>() {
-          @Override
-          public ColumnVectorBatch create() {
-            return new ColumnVectorBatch(colCount);
-          }
-          @Override
-          public void resetBeforeOffer(ColumnVectorBatch t) {
-            // Don't reset anything, we are reusing column vectors.
-          }
-        });
+    cvbPool = new FixedSizedObjectPool<>(CVB_POOL_SIZE, new Pool.PoolObjectHelper<ColumnVectorBatch>() {
+      @Override public ColumnVectorBatch create() {
+        return new ColumnVectorBatch(colCount);
+      }
+
+      @Override public void resetBeforeOffer(ColumnVectorBatch t) {
+        // Don't reset anything, we are reusing column vectors.
+      }
+    });
     this.counters = counters;
   }
 
@@ -81,6 +79,9 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
 
     @Override
     public Void call() throws Exception {
+      if (mxBean == null) {
+        return readCallable.call();
+      }
       long cpuTime = mxBean.getCurrentThreadCpuTime(),
           userTime = mxBean.getCurrentThreadUserTime();
       try {
@@ -145,6 +146,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
   @Override
   public void setDone() throws InterruptedException {
     downstreamConsumer.setDone();
+    cvbPool.clear();
   }
 
   @Override
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
new file mode 100644
index 0000000..7e71cf2
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.impl;
+
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+public class LlapRecordReaderQueueSizeTest {
+
+  private static final int END_EXCLUSIVE = 300;
+  private static final int MAX_BUFFERED_SIZE = 1 << 30; //1GB
+
+  @Test public void testMaxEqMin() {
+    int expected = LlapRecordReader.determineQueueLimit(0, 100, 100, null, true);
+    Assert.assertEquals(100, expected);
+  }
+
+  @Test public void testMaxIsEnforced() {
+    TypeInfo[] cols = { new DecimalTypeInfo() };
+    int actual = LlapRecordReader.determineQueueLimit(Long.MAX_VALUE, 10, 1, cols, true);
+    Assert.assertEquals(10, actual);
+  }
+
+  @Test public void testMinIsEnforced() {
+    TypeInfo[] cols = { new DecimalTypeInfo() };
+    int actual = LlapRecordReader.determineQueueLimit(0, 10, 5, cols, true);
+    Assert.assertEquals(5, actual);
+  }
+
+  @Test public void testOrderDecimal64VsFatDecimals() {
+    TypeInfo[] cols = IntStream.range(0, 300).mapToObj(i -> new DecimalTypeInfo()).toArray(TypeInfo[]::new);
+    int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, true);
+    Assert.assertEquals(75, actual);
+    // the idea it to see an order of 10 when using fat Decimals
+    actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, false);
+    Assert.assertEquals(7, actual);
+  }
+
+  @Test public void testOrderDecimal64VsLong() {
+    TypeInfo[] decimalCols = ArrayOf(() -> new DecimalTypeInfo(TypeDescription.MAX_DECIMAL64_PRECISION, 0));
+    TypeInfo[] longCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo);
+    Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longCols, true),
+        LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, decimalCols, true));
+  }
+
+  @Test public void testStringsColumns() {
+    TypeInfo[] charsCols = ArrayOf(() -> TypeInfoFactory.charTypeInfo);
+    TypeInfo[] stringCols = ArrayOf(() -> TypeInfoFactory.stringTypeInfo);
+    TypeInfo[] binaryCols = ArrayOf(() -> TypeInfoFactory.binaryTypeInfo);
+    Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, stringCols, true), 9);
+    Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, charsCols, true));
+    Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, binaryCols, true));
+  }
+
+  @Test public void testLongColumns() {
+    TypeInfo[] longsCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo);
+    TypeInfo[] intCols = ArrayOf(() -> TypeInfoFactory.intTypeInfo);
+    TypeInfo[] byteCols = ArrayOf(() -> TypeInfoFactory.byteTypeInfo);
+    Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longsCols, true));
+    Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intCols, true));
+    Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, byteCols, true));
+  }
+
+  @Test public void testTimestampsColumns() {
+    TypeInfo[] tsCols = ArrayOf(() -> TypeInfoFactory.timestampTypeInfo);
+    TypeInfo[] intervalCols = ArrayOf(() -> TypeInfoFactory.intervalDayTimeTypeInfo);
+    Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, tsCols, true));
+    Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intervalCols, true));
+  }
+
+  private static TypeInfo[] ArrayOf(Supplier<TypeInfo> supplier) {
+    return IntStream.range(0, END_EXCLUSIVE).mapToObj(i -> supplier.get()).toArray(TypeInfo[]::new);
+  }
+}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
index b9789ec..0522cc1 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
@@ -17,21 +17,36 @@
  */
 package org.apache.hadoop.hive.common;
 
-/** Simple object pool to prevent GC on small objects passed between threads. */
+/**
+ * Simple object pool to prevent GC on small objects passed between threads.
+ */
 public interface Pool<T> {
-  /** Object helper for objects stored in the pool. */
+  /**
+   * Object helper for objects stored in the pool.
+   */
   public interface PoolObjectHelper<T> {
-    /** Called to create an object when one cannot be provided.
+    /**
+     * Called to create an object when one cannot be provided.
+     *
      * @return a newly allocated object
      */
     T create();
-    /** Called before the object is put in the pool (regardless of whether put succeeds).
+
+    /**
+     * Called before the object is put in the pool (regardless of whether put succeeds).
+     *
      * @param t the object to reset
      */
     void resetBeforeOffer(T t);
   }
 
   T take();
+
   void offer(T t);
+
   int size();
-}
\ No newline at end of file
+
+  default void clear() {
+    //no op
+  }
+}