You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/06/25 23:02:12 UTC

[impala] branch master updated: IMPALA-2658: Extend the NDV function to accept a precision

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new eef61d2  IMPALA-2658: Extend the NDV function to accept a precision
eef61d2 is described below

commit eef61d22d89b97eb589936701a41d05d84b0da8a
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Thu May 28 06:17:24 2020 -0700

    IMPALA-2658: Extend the NDV function to accept a precision
    
    This work addresses the current limitation in NDV function by
    extending the function to optionally take a secondary argument
    called scale.
    
       NDV([DISTINCT | ALL] expression [, scale])
    
    Without the secondary argument, all the syntax and semantics are
    preserved. The precision, which determines the total number
    of different estimators in the HLL algorithm, is still 10.
    
    When supplied, the scale argument must be an interger literal
    in the range from 1 to 10. Its value is internally mapped
    to a precision used by the HLL algorithm, with the following
    mapping formula:
    
      precision = scale + 8.
    
    Thus, a scale of 1 is mapped to a precision of 9 and a scale of
    10 is mapped to a precision of 18.
    
    A large precision value generally produces a better estimation
    (i.e. with less error) than a small precision value, due to extra
    number of estimators involved. The expense is at the extra amount of
    memory needed. For a given precision p, the amount of memory used
    by the HLL algorithm is in the order of 2^p bytes.
    
    Testing:
    1. Ran unit tests against table store_sales in TPC-DS and table customer
       in TPCH in both serial and parallel plan settings;
    2. Added and ran a new regression test (test_ndv)) in
       TestAggregationQueries section to compute NDV() for every supported
       Impala data type over all valid scale values;
    3. Ran "core" tests.
    
    Performance:
    1. Ran estimation error tests against a total of 22 distinct data sets
       loaded into external Impala tables.
    
       The error was computed as
       abs(<true_unique_value> - <estimated_unique_value>) / <true_unique_value>.
    
       Overall, the precision of 18 (or the scale value of 10) gave
       the best result with worst estimation error at 0.42% (for one set
       of 10 million integers), and average error no more than 0.17%,
       at the cost of 256Kb of memory for the internal data structure per
       evaluation of the HLL algorithm.  Other precisions (such as 16 and
       17) were also very reasonable but with slightly larger estimation
       errors.
    
    2. Ran execution time tests against a total of 6 distinct data files
       on a single node EC2 VM in debug mode. These data files were loaded
       in turn into a single column in an external Impala table.  It was
       found that the total execution time was relatively the same across
       different scales for a given table configuration. It remains to be
       seen the execution time for tables involving multiple data files
       across multiple nodes.
    
    3. Ran execution time tests comparing the before- and
       after-enhancement version of NDV().
    
    Change-Id: I48a4517bd0959f7021143073d37505a46c551a58
    Reviewed-on: http://gerrit.cloudera.org:8080/15997
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/logging.h                            |   7 +
 be/src/exec/incr-stats-util-test.cc                |  30 +--
 be/src/exec/incr-stats-util.cc                     |  20 +-
 be/src/exec/incr-stats-util.h                      |   2 +-
 be/src/exprs/aggregate-functions-ir.cc             | 221 +++++++++++++++++----
 be/src/exprs/aggregate-functions.h                 |  36 +++-
 .../apache/impala/analysis/FunctionCallExpr.java   |  39 ++++
 .../java/org/apache/impala/catalog/BuiltinsDb.java | 106 +++++++++-
 tests/query_test/test_aggregation.py               |  43 ++++
 9 files changed, 424 insertions(+), 80 deletions(-)

diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index 643113d..bcb2389 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -62,6 +62,13 @@
 #define VLOG_ROW_IS_ON VLOG_IS_ON(3)
 #define VLOG_PROGRESS_IS_ON VLOG_IS_ON(2)
 
+// Define a range check macro to test x in the inclusive range from low to high.
+#define DCHECK_IN_RANGE(x, low, high) \
+  {                                   \
+    DCHECK_GE(x, low);                \
+    DCHECK_LE(x, high);               \
+  }
+
 /// Define a wrapper around DCHECK for strongly typed enums that print a useful error
 /// message on failure.
 #define DCHECK_ENUM_EQ(a, b)                                               \
diff --git a/be/src/exec/incr-stats-util-test.cc b/be/src/exec/incr-stats-util-test.cc
index a955633..11eb634 100644
--- a/be/src/exec/incr-stats-util-test.cc
+++ b/be/src/exec/incr-stats-util-test.cc
@@ -29,10 +29,10 @@ using namespace impala;
 extern string EncodeNdv(const string& ndv, bool* is_encoded);
 extern string DecodeNdv(const string& ndv, bool is_encoded);
 
-static const int HLL_LEN = pow(2, AggregateFunctions::HLL_PRECISION);
+static const int DEFAULT_HLL_LEN = pow(2, AggregateFunctions::DEFAULT_HLL_PRECISION);
 
 TEST(IncrStatsUtilTest, TestEmptyRle) {
-  string test(HLL_LEN, 0);
+  string test(DEFAULT_HLL_LEN, 0);
 
   bool is_encoded;
   const string& encoded = EncodeNdv(test, &is_encoded);
@@ -40,13 +40,13 @@ TEST(IncrStatsUtilTest, TestEmptyRle) {
   ASSERT_TRUE(is_encoded);
 
   const string& decoded = DecodeNdv(encoded, is_encoded);
-  ASSERT_EQ(HLL_LEN, decoded.size());
+  ASSERT_EQ(DEFAULT_HLL_LEN, decoded.size());
   ASSERT_EQ(test, decoded);
 }
 
 TEST(IncrStatsUtilTest, TestNoEncode) {
   string test;
-  for (int i = 0; i < HLL_LEN; ++i) {
+  for (int i = 0; i < DEFAULT_HLL_LEN; ++i) {
     test += (i % 2 == 0) ? 'A' : 'B';
   }
 
@@ -60,7 +60,7 @@ TEST(IncrStatsUtilTest, TestNoEncode) {
 
 TEST(IncrStatsUtilTest, TestEncode) {
   string test;
-  for (int i = 0; i < HLL_LEN; ++i) {
+  for (int i = 0; i < DEFAULT_HLL_LEN; ++i) {
     test += (i < 512) ? 'A' : 'B';
   }
 
@@ -82,25 +82,25 @@ TEST(IncrStatsUtilTest, TestNumNullAggregation) {
   PerColumnStats* stat = new PerColumnStats();
   ASSERT_EQ(0, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, 1, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 1, 0, 0);
   ASSERT_EQ(1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
   ASSERT_EQ(1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, 2, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 2, 0, 0);
   ASSERT_EQ(3, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, -1, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, -1, 0, 0);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, 3, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 3, 0, 0);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, -1, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, -1, 0, 0);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 }
 
@@ -112,15 +112,15 @@ TEST(IncrStatsUtilTest, TestNumNullAggregation) {
 TEST(IncrStatsUtilTest, TestAvgSizehAggregation) {
   PerColumnStats* stat = new PerColumnStats();
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 1, 4, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 1, 4, 0, 0, 0, 0);
   stat->Finalize();
   ASSERT_EQ(4, stat->ToTColumnStats().avg_size);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 2, 7, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 2, 7, 0, 0, 0, 0);
   stat->Finalize();
   ASSERT_EQ(6, stat->ToTColumnStats().avg_size);
 
-  stat->Update(string(AggregateFunctions::HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
   stat->Finalize();
   ASSERT_EQ(6, stat->ToTColumnStats().avg_size);
 }
diff --git a/be/src/exec/incr-stats-util.cc b/be/src/exec/incr-stats-util.cc
index e867cc0..763c68d 100644
--- a/be/src/exec/incr-stats-util.cc
+++ b/be/src/exec/incr-stats-util.cc
@@ -42,7 +42,7 @@ using namespace strings;
 // of the NDV computation into its output StringVal.
 StringVal IncrementNdvFinalize(FunctionContext* ctx, const StringVal& src) {
   if (UNLIKELY(src.is_null)) return src;
-  DCHECK_EQ(src.len, AggregateFunctions::HLL_LEN);
+  DCHECK_EQ(src.len, AggregateFunctions::DEFAULT_HLL_LEN);
   StringVal result_str(ctx, src.len);
   if (UNLIKELY(result_str.is_null)) return result_str;
   memcpy(result_str.ptr, src.ptr, src.len);
@@ -60,8 +60,8 @@ StringVal IncrementNdvFinalize(FunctionContext* ctx, const StringVal& src) {
 // shorter than the input. Otherwise it is set to false, and the input is returned
 // unencoded.
 string EncodeNdv(const string& ndv, bool* is_encoded) {
-  DCHECK_EQ(ndv.size(), AggregateFunctions::HLL_LEN);
-  string encoded_ndv(AggregateFunctions::HLL_LEN, 0);
+  DCHECK_EQ(ndv.size(), AggregateFunctions::DEFAULT_HLL_LEN);
+  string encoded_ndv(AggregateFunctions::DEFAULT_HLL_LEN, 0);
   int idx = 0;
   char last = ndv[0];
 
@@ -69,9 +69,9 @@ string EncodeNdv(const string& ndv, bool* is_encoded) {
   // a byte 0-255, but the actual count is always one more than the encoded value
   // (i.e. in the range 1-256 inclusive).
   uint8_t count = 0;
-  for (int i = 1; i < AggregateFunctions::HLL_LEN; ++i) {
+  for (int i = 1; i < AggregateFunctions::DEFAULT_HLL_LEN; ++i) {
     if (ndv[i] != last || count == numeric_limits<uint8_t>::max()) {
-      if (idx + 2 > AggregateFunctions::HLL_LEN) break;
+      if (idx + 2 > AggregateFunctions::DEFAULT_HLL_LEN) break;
       // Write a (count, value) pair to two successive bytes
       encoded_ndv[idx++] = count;
       count = 0;
@@ -83,7 +83,7 @@ string EncodeNdv(const string& ndv, bool* is_encoded) {
   }
 
   // +2 for the remaining two bytes written below
-  if (idx + 2 > AggregateFunctions::HLL_LEN) {
+  if (idx + 2 > AggregateFunctions::DEFAULT_HLL_LEN) {
     *is_encoded = false;
     return ndv;
   }
@@ -94,21 +94,21 @@ string EncodeNdv(const string& ndv, bool* is_encoded) {
   *is_encoded = true;
   encoded_ndv.resize(idx);
   DCHECK_GT(encoded_ndv.size(), 0);
-  DCHECK_LE(encoded_ndv.size(), AggregateFunctions::HLL_LEN);
+  DCHECK_LE(encoded_ndv.size(), AggregateFunctions::DEFAULT_HLL_LEN);
   return encoded_ndv;
 }
 
 string DecodeNdv(const string& ndv, bool is_encoded) {
   if (!is_encoded) return ndv;
   DCHECK_EQ(ndv.size() % 2, 0);
-  string decoded_ndv(AggregateFunctions::HLL_LEN, 0);
+  string decoded_ndv(AggregateFunctions::DEFAULT_HLL_LEN, 0);
   int idx = 0;
   for (int i = 0; i < ndv.size(); i += 2) {
     for (int j = 0; j < (static_cast<uint8_t>(ndv[i])) + 1; ++j) {
       decoded_ndv[idx++] = ndv[i+1];
     }
   }
-  DCHECK_EQ(idx, AggregateFunctions::HLL_LEN);
+  DCHECK_EQ(idx, AggregateFunctions::DEFAULT_HLL_LEN);
   return decoded_ndv;
 }
 
@@ -240,7 +240,7 @@ void FinalizePartitionedColumnStats(const TTableSchema& col_stats_schema,
   TIntermediateColumnStats empty_column_stats;
   bool is_encoded;
   empty_column_stats.__set_intermediate_ndv(
-      EncodeNdv(string(AggregateFunctions::HLL_LEN, 0), &is_encoded));
+      EncodeNdv(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), &is_encoded));
   empty_column_stats.__set_is_ndv_encoded(is_encoded);
   empty_column_stats.__set_num_nulls(0);
   empty_column_stats.__set_max_width(0);
diff --git a/be/src/exec/incr-stats-util.h b/be/src/exec/incr-stats-util.h
index f534d49..188b7f9 100644
--- a/be/src/exec/incr-stats-util.h
+++ b/be/src/exec/incr-stats-util.h
@@ -60,7 +60,7 @@ struct PerColumnStats {
   double avg_width;
 
   PerColumnStats()
-    : intermediate_ndv(AggregateFunctions::HLL_LEN, 0),
+    : intermediate_ndv(AggregateFunctions::DEFAULT_HLL_LEN, 0),
       num_nulls(0),
       max_width(0),
       num_rows(0),
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 9c48b93..0285f4d 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -94,19 +94,19 @@ static float HllThreshold(int p) {
 
 // Implements k nearest neighbor interpolation for k=6,
 // we choose 6 bassed on the HLL++ paper
-int64_t HllEstimateBias(int64_t estimate) {
+int64_t HllEstimateBias(int64_t estimate, int precision) {
   const size_t K = 6;
 
   // Precision index into data arrays
   // We don't have data for precisions less than 4
-  DCHECK(impala::AggregateFunctions::HLL_PRECISION >= 4);
-  static constexpr size_t idx = impala::AggregateFunctions::HLL_PRECISION - 4;
+  DCHECK_IN_RANGE(precision, impala::AggregateFunctions::MIN_HLL_PRECISION,
+      impala::AggregateFunctions::MAX_HLL_PRECISION);
+  size_t idx = precision - 4;
 
   // Calculate the square of the difference of this estimate to all
   // precalculated estimates for a particular precision
   map<double, size_t> distances;
-  for (size_t i = 0;
-      i < impala::HLL_DATA_SIZES[idx] / sizeof(double); ++i) {
+  for (size_t i = 0; i < impala::HLL_DATA_SIZES[idx] / sizeof(double); ++i) {
     double val = estimate - impala::HLL_RAW_ESTIMATE_DATA[idx][i];
     distances.insert(make_pair(val * val, i));
   }
@@ -180,8 +180,13 @@ StringVal ToStringVal(FunctionContext* context, T val) {
       context, reinterpret_cast<const uint8_t*>(str.c_str()), str.size());
 }
 
-constexpr int AggregateFunctions::HLL_PRECISION;
-constexpr int AggregateFunctions::HLL_LEN;
+constexpr int AggregateFunctions::DEFAULT_HLL_PRECISION;
+constexpr int AggregateFunctions::MIN_HLL_PRECISION;
+constexpr int AggregateFunctions::MAX_HLL_PRECISION;
+
+constexpr int AggregateFunctions::DEFAULT_HLL_LEN;
+constexpr int AggregateFunctions::MIN_HLL_LEN;
+constexpr int AggregateFunctions::MAX_HLL_LEN;
 
 void AggregateFunctions::InitNull(FunctionContext*, AnyVal* dst) {
   dst->is_null = true;
@@ -1433,97 +1438,175 @@ T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, const StringVal&
   return result;
 }
 
+// Compute the precision from a scale value.
+static inline int ComputePrecisionFromScale(int scale) {
+  return scale + 8;
+}
+
+// Compute the precision from a scale value. This method must be identical
+// to function ComputeHllLengthFromScale() defined in FunctionCallExpr.java.
+static inline int ComputeHllLengthFromScale(int scale) {
+  return 1 << ComputePrecisionFromScale(scale);
+}
+
+// Compute the precision from a hll length as log2(len) or # of trailing
+// zeros in length. For example, when len is 1024 = 2^10 = 0b10000000000,
+// precision = 10.
+static inline int ComputePrecisionFromHllLength(int hll_len) {
+  return BitUtil::CountTrailingZeros((unsigned int)hll_len, sizeof(hll_len) * CHAR_BIT);
+}
+
+// Verify that the length of the intermediate data type is computable from
+// the precision as represented in the 2nd argument.
+static inline bool CheckHllArgs(FunctionContext* ctx, StringVal* dst) {
+  if (ctx->GetNumArgs() == 2) {
+    IntVal* int_val = reinterpret_cast<IntVal*>(ctx->GetConstantArg(1));
+
+    // In parallel plan, the merge() function takes only one argument which
+    // is the intermediate data. Avoid check in such cases.
+    if (int_val) {
+      return dst->len == ComputeHllLengthFromScale(int_val->val);
+    }
+  }
+  return true;
+}
+
 void AggregateFunctions::HllInit(FunctionContext* ctx, StringVal* dst) {
   // The HLL functions use a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
-  DCHECK_EQ(dst->len, HLL_LEN);
-  memset(dst->ptr, 0, HLL_LEN);
+  DCHECK_IN_RANGE(dst->len, MIN_HLL_LEN, MAX_HLL_LEN);
+  memset(dst->ptr, 0, dst->len);
+  DCHECK(CheckHllArgs(ctx, dst));
 }
 
+// Implementation for update functions. It accepts a precision.
+// Always inline in IR so that constants can be replaced.
 template <typename T>
-void AggregateFunctions::HllUpdate(FunctionContext* ctx, const T& src, StringVal* dst) {
+IR_ALWAYS_INLINE void AggregateFunctions::HllUpdate(
+    FunctionContext* ctx, const T& src, StringVal* dst, int precision) {
   if (src.is_null) return;
   DCHECK(!dst->is_null);
-  DCHECK_EQ(dst->len, HLL_LEN);
+
+  const int& hll_len = dst->len;
+  DCHECK_IN_RANGE(hll_len, MIN_HLL_LEN, MAX_HLL_LEN);
+
   uint64_t hash_value =
       AnyValUtil::Hash64(src, *ctx->GetArgType(0), HashUtil::FNV64_SEED);
   // Use the lower bits to index into the number of streams and then find the first 1 bit
   // after the index bits.
-  int idx = hash_value & (HLL_LEN - 1);
-  const uint8_t first_one_bit =
-      1 + BitUtil::CountTrailingZeros(
-              hash_value >> HLL_PRECISION, sizeof(hash_value) * CHAR_BIT - HLL_PRECISION);
+  int idx = hash_value & (hll_len - 1);
+  const uint8_t first_one_bit = 1
+      + BitUtil::CountTrailingZeros(
+            hash_value >> precision, sizeof(hash_value) * CHAR_BIT - precision);
   dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
 }
 
-// Specialize for DecimalVal to allow substituting decimal size.
-template <>
+// Update function for NDV() that accepts an expression only.
+template <typename T>
+void AggregateFunctions::HllUpdate(FunctionContext* ctx, const T& src, StringVal* dst) {
+  HllUpdate(ctx, src, dst, DEFAULT_HLL_PRECISION);
+}
+
+// Update function for NDV() that accepts an expression and a scale value.
+template <typename T>
 void AggregateFunctions::HllUpdate(
-    FunctionContext* ctx, const DecimalVal& src, StringVal* dst) {
+    FunctionContext* ctx, const T& src1, const IntVal& src2, StringVal* dst) {
+  HllUpdate(ctx, src1, dst, ComputePrecisionFromScale(src2.val));
+}
+
+// Implementation for update functions for DecimalVal to allow substituting decimal
+// size. It accepts a precision. Always inline in IR so that constants can be replaced.
+template <>
+IR_ALWAYS_INLINE void AggregateFunctions::HllUpdate(
+    FunctionContext* ctx, const DecimalVal& src, StringVal* dst, int precision) {
   if (src.is_null) return;
   DCHECK(!dst->is_null);
-  DCHECK_EQ(dst->len, HLL_LEN);
+
+  const int& hll_len = dst->len;
+  DCHECK_IN_RANGE(hll_len, MIN_HLL_LEN, MAX_HLL_LEN);
+
   int byte_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0);
   uint64_t hash_value = AnyValUtil::HashDecimal64(src, byte_size, HashUtil::FNV64_SEED);
   if (hash_value != 0) {
     // Use the lower bits to index into the number of streams and then
     // find the first 1 bit after the index bits.
-    int idx = hash_value & (HLL_LEN - 1);
-    uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_PRECISION) + 1;
+    int idx = hash_value & (hll_len - 1);
+    uint8_t first_one_bit = __builtin_ctzl(hash_value >> precision) + 1;
     dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
   }
 }
 
+// Specialized update function for NDV() that accepts a decimal typed expression.
+template <>
+void AggregateFunctions::HllUpdate(
+    FunctionContext* ctx, const DecimalVal& src, StringVal* dst) {
+  HllUpdate(ctx, src, dst, DEFAULT_HLL_PRECISION);
+}
+
+// Specialized update function for NDV() that accepts a decimal typed expression
+// and a scale value.
+template <>
+void AggregateFunctions::HllUpdate(
+    FunctionContext* ctx, const DecimalVal& src1, const IntVal& src2, StringVal* dst) {
+  HllUpdate(ctx, src1, dst, ComputePrecisionFromScale(src2.val));
+}
+
 void AggregateFunctions::HllMerge(
     FunctionContext* ctx, const StringVal& src, StringVal* dst) {
   DCHECK(!dst->is_null);
   DCHECK(!src.is_null);
-  DCHECK_EQ(dst->len, HLL_LEN);
-  DCHECK_EQ(src.len, HLL_LEN);
+  DCHECK_IN_RANGE(src.len, MIN_HLL_LEN, MAX_HLL_LEN);
+  DCHECK_EQ(src.len, dst->len);
+
   for (int i = 0; i < src.len; ++i) {
     dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]);
   }
 }
 
-uint64_t AggregateFunctions::HllFinalEstimate(const uint8_t* buckets) {
+uint64_t AggregateFunctions::HllFinalEstimate(const uint8_t* buckets, int hll_len) {
   DCHECK(buckets != NULL);
 
   // Empirical constants for the algorithm.
   double alpha = 0;
-  if (HLL_LEN == 16) {
+  DCHECK_IN_RANGE(hll_len, MIN_HLL_LEN, MAX_HLL_LEN);
+  int precision = ComputePrecisionFromHllLength(hll_len);
+
+  if (hll_len == 16) {
     alpha = 0.673;
-  } else if (HLL_LEN == 32) {
+  } else if (hll_len == 32) {
     alpha = 0.697;
-  } else if (HLL_LEN == 64) {
+  } else if (hll_len == 64) {
     alpha = 0.709;
   } else {
-    alpha = 0.7213 / (1 + 1.079 / HLL_LEN);
+    alpha = 0.7213 / (1 + 1.079 / hll_len);
   }
 
   double harmonic_mean = 0;
   int num_zero_registers = 0;
-  for (int i = 0; i < HLL_LEN; ++i) {
+  for (int i = 0; i < hll_len; ++i) {
     harmonic_mean += ldexp(1.0, -buckets[i]);
     if (buckets[i] == 0) ++num_zero_registers;
   }
   harmonic_mean = 1.0 / harmonic_mean;
-  int64_t estimate = alpha * HLL_LEN * HLL_LEN * harmonic_mean;
+
+  // The actual harmonic mean is hll_len * harmonic_mean.
+  int64_t estimate = alpha * hll_len * hll_len * harmonic_mean;
   // Adjust for Hll bias based on Hll++ algorithm
-  if (estimate <= 5 * HLL_LEN) {
-    estimate -= HllEstimateBias(estimate);
+  if (estimate <= 5 * hll_len) {
+    estimate -= HllEstimateBias(estimate, precision);
   }
 
   if (num_zero_registers == 0) return estimate;
 
   // Estimated cardinality is too low. Hll is too inaccurate here, instead use
   // linear counting.
-  int64_t h = HLL_LEN * log(static_cast<double>(HLL_LEN) / num_zero_registers);
+  int64_t h = hll_len * log(static_cast<double>(hll_len) / num_zero_registers);
 
-  return (h <= HllThreshold(HLL_PRECISION)) ? h : estimate;
+  return (h <= HllThreshold(precision)) ? h : estimate;
 }
 
 BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal& src) {
   if (UNLIKELY(src.is_null)) return BigIntVal::null();
-  uint64_t estimate = HllFinalEstimate(src.ptr);
+  uint64_t estimate = HllFinalEstimate(src.ptr, src.len);
   return estimate;
 }
 
@@ -1538,7 +1621,8 @@ class SampledNdvState {
   static const uint32_t NUM_HLL_BUCKETS = 32;
 
   /// A bucket contains an update count and an HLL intermediate state.
-  static constexpr int64_t BUCKET_SIZE = sizeof(int64_t) + AggregateFunctions::HLL_LEN;
+  static constexpr int64_t BUCKET_SIZE =
+      sizeof(int64_t) + AggregateFunctions::DEFAULT_HLL_LEN;
 
   /// Sampling percent which was given as the second argument to SampledNdv().
   /// Stored here to avoid existing issues with passing constant arguments to all
@@ -1552,7 +1636,7 @@ class SampledNdvState {
   /// Array of buckets.
   struct {
     int64_t row_count;
-    uint8_t hll[AggregateFunctions::HLL_LEN];
+    uint8_t hll[AggregateFunctions::DEFAULT_HLL_LEN];
   } buckets[NUM_HLL_BUCKETS];
 };
 
@@ -1578,7 +1662,7 @@ void AggregateFunctions::SampledNdvUpdate(FunctionContext* ctx, const T& src,
     const DoubleVal& sample_perc, StringVal* dst) {
   SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr);
   int64_t bucket_idx = state->total_row_count % SampledNdvState::NUM_HLL_BUCKETS;
-  StringVal hll_dst = StringVal(state->buckets[bucket_idx].hll, HLL_LEN);
+  StringVal hll_dst = StringVal(state->buckets[bucket_idx].hll, DEFAULT_HLL_LEN);
   HllUpdate(ctx, src, &hll_dst);
   ++state->buckets[bucket_idx].row_count;
   ++state->total_row_count;
@@ -1589,8 +1673,8 @@ void AggregateFunctions::SampledNdvMerge(FunctionContext* ctx, const StringVal&
   SampledNdvState* src_state = reinterpret_cast<SampledNdvState*>(src.ptr);
   SampledNdvState* dst_state = reinterpret_cast<SampledNdvState*>(dst->ptr);
   for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) {
-    StringVal src_hll = StringVal(src_state->buckets[i].hll, HLL_LEN);
-    StringVal dst_hll = StringVal(dst_state->buckets[i].hll, HLL_LEN);
+    StringVal src_hll = StringVal(src_state->buckets[i].hll, DEFAULT_HLL_LEN);
+    StringVal dst_hll = StringVal(dst_state->buckets[i].hll, DEFAULT_HLL_LEN);
     HllMerge(ctx, src_hll, &dst_hll);
     dst_state->buckets[i].row_count += src_state->buckets[i].row_count;
   }
@@ -1624,15 +1708,15 @@ BigIntVal AggregateFunctions::SampledNdvFinalize(FunctionContext* ctx,
   // are sufficient for reasonable accuracy.
   int pidx = 0;
   for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) {
-    uint8_t merged_hll_data[HLL_LEN];
-    memset(merged_hll_data, 0, HLL_LEN);
-    StringVal merged_hll(merged_hll_data, HLL_LEN);
+    uint8_t merged_hll_data[DEFAULT_HLL_LEN];
+    memset(merged_hll_data, 0, DEFAULT_HLL_LEN);
+    StringVal merged_hll(merged_hll_data, DEFAULT_HLL_LEN);
     int64_t merged_count = 0;
     for (int j = 0; j < SampledNdvState::NUM_HLL_BUCKETS; ++j) {
       int bucket_idx = (i + j) % SampledNdvState::NUM_HLL_BUCKETS;
       merged_count += state->buckets[bucket_idx].row_count;
       counts[pidx] = merged_count;
-      StringVal hll = StringVal(state->buckets[bucket_idx].hll, HLL_LEN);
+      StringVal hll = StringVal(state->buckets[bucket_idx].hll, DEFAULT_HLL_LEN);
       HllMerge(ctx, hll, &merged_hll);
       ndvs[pidx] = HllFinalEstimate(merged_hll.ptr);
       ++pidx;
@@ -2393,6 +2477,31 @@ template DecimalVal AggregateFunctions::AppxMedianFinalize<DecimalVal>(
 template DateVal AggregateFunctions::AppxMedianFinalize<DateVal>(
     FunctionContext*, const StringVal&);
 
+// Method instantiation for the implementation of the Update
+// functions.
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const BooleanVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const TinyIntVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const SmallIntVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const IntVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const BigIntVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const FloatVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const DoubleVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const StringVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const TimestampVal&, StringVal*, int precision);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const DateVal&, StringVal*, int precision);
+
+// Method instantiation for NDV() that accepts a single argument. The
+// NDV() is computed with a precision of 10.
 template void AggregateFunctions::HllUpdate(
     FunctionContext*, const BooleanVal&, StringVal*);
 template void AggregateFunctions::HllUpdate(
@@ -2414,6 +2523,30 @@ template void AggregateFunctions::HllUpdate(
 template void AggregateFunctions::HllUpdate(
     FunctionContext*, const DateVal&, StringVal*);
 
+// Method instantiation for NDV() that accepts two arguments. The
+// NDV() is computed with a precision indirectly specified through
+// the 2nd argument.
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const BooleanVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const TinyIntVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const SmallIntVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const IntVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const BigIntVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const FloatVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const DoubleVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const StringVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const TimestampVal&, const IntVal&, StringVal*);
+template void AggregateFunctions::HllUpdate(
+    FunctionContext*, const DateVal&, const IntVal&, StringVal*);
+
 template void AggregateFunctions::SampledNdvUpdate(
     FunctionContext*, const BooleanVal&, const DoubleVal&, StringVal*);
 template void AggregateFunctions::SampledNdvUpdate(
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index f86a7f2..b23525b 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -193,17 +193,45 @@ class AggregateFunctions {
 
   /// This precision is the default precision from the paper. It doesn't seem to matter
   /// very much when between 6 and 12.
-  static constexpr int HLL_PRECISION = 10;
-  static constexpr int HLL_LEN = 1 << HLL_PRECISION;
+  static constexpr int DEFAULT_HLL_PRECISION = 10;
+
+  // Define the min and the max of a precision value that can be indirectly
+  // specified by the 2nd argument in NDV(<arg1>, <arg2>).
+  static constexpr int MIN_HLL_PRECISION = 9;
+  static constexpr int MAX_HLL_PRECISION = 18;
+
+  // DEFAULT_HLL_LEN is the parameter m defined in the paper.
+  static constexpr int DEFAULT_HLL_LEN = 1 << DEFAULT_HLL_PRECISION;
+
+  // Define the min and the max of parameter m.
+  static constexpr int MIN_HLL_LEN = 1 << MIN_HLL_PRECISION;
+  static constexpr int MAX_HLL_LEN = 1 << MAX_HLL_PRECISION;
+
   static void HllInit(FunctionContext*, StringVal* slot);
+
+  // The implementation for both versions of the update functions below.
+  template <typename T>
+  static void HllUpdate(FunctionContext*, const T& src, StringVal* dst, int precision);
+
+  // Update function for single input argument version of NDV(),
+  // utilizing the default precision for HLL algorithm.
   template <typename T>
   static void HllUpdate(FunctionContext*, const T& src, StringVal* dst);
+
+  // Update function for two input argument version of NDV(),
+  // utilizing the precision value indirectly specified through the
+  // 2nd argument in NDV().
+  template <typename T>
+  static void HllUpdate(
+      FunctionContext*, const T& src1, const IntVal& src2, StringVal* dst);
+
   static void HllMerge(FunctionContext*, const StringVal& src, StringVal* dst);
   static BigIntVal HllFinalize(FunctionContext*, const StringVal& src);
 
   /// Utility method to compute the final result of an HLL estimation.
-  /// Assumes HLL_LEN number of buckets.
-  static uint64_t HllFinalEstimate(const uint8_t* buckets);
+  /// Assumes hll_len number of buckets.
+  static uint64_t HllFinalEstimate(
+      const uint8_t* buckets, int hll_len = AggregateFunctions::DEFAULT_HLL_LEN);
 
   /// Estimates the number of distinct values (NDV) based on a sample of data and the
   /// corresponding sampling rate. The main idea of this function is to collect several
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 1fea640..aa78a31 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -471,6 +471,12 @@ public class FunctionCallExpr extends Expr {
     return ScalarType.createClippedDecimalType(digitsBefore + digitsAfter, digitsAfter);
   }
 
+  // First compute the precision as (scale + 8) and then compute
+  // the needed memory for that precision value which is 2^precision.
+  // This method must be identical to function ComputeHllLengthFromScale()
+  // defined in aggregate-functions-ir.cc.
+  private int ComputeHllLengthFromScale(int scale) { return 1 << (scale + 8); }
+
   @Override
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     fnName_.analyze(analyzer);
@@ -555,6 +561,39 @@ public class FunctionCallExpr extends Expr {
       throw new AnalysisException(getFunctionNotFoundError(argTypes));
     }
 
+    // NDV() can optionally take a second argument which must be an integer literal
+    // in the range from 1 to 10. Perform the analysis here.
+    if (fnName_.getFunction().equalsIgnoreCase("ndv") && children_.size() == 2) {
+      if (!(children_.get(1) instanceof NumericLiteral)) {
+        throw new AnalysisException(
+            "Second parameter of NDV() must be an integer literal: "
+            + children_.get(1).toSql());
+      }
+
+      NumericLiteral scale = (NumericLiteral) children_.get(1);
+
+      if (scale.getValue().scale() != 0
+          || !NumericLiteral.fitsInInt(scale.getValue())
+          || scale.getIntValue() < 1 || scale.getIntValue() > 10) {
+        throw new AnalysisException(
+            "Second parameter of NDV() must be an integer literal in [1,10]: "
+            + scale.toSql());
+      }
+      children_.set(1, scale.uncheckedCastTo(Type.INT));
+
+      // In BuiltinsDb, look for an AggregateFunction template with the correct length for
+      // the intermediate data type and use it.
+      BuiltinsDb builtinDb = (BuiltinsDb) db;
+      int size = ComputeHllLengthFromScale(scale.getIntValue());
+      fn_ = builtinDb.resolveNdvIntermediateType((AggregateFunction) fn_, size);
+
+      if (fn_ == null) {
+        throw new AnalysisException(
+            "A suitable intermediate data type cannot be found for the second parameter "
+            + children_.get(1).toSql() + " in NDV()");
+      }
+    }
+
     if (isAggregateFunction()) {
       // subexprs must not contain aggregates
       if (TreeNode.contains(children_, Expr.IS_AGGREGATE)) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 08589f3..9fa41bb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -17,9 +17,11 @@
 
 package org.apache.impala.catalog;
 
+import java.util.List;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.impala.analysis.ArithmeticExpr;
@@ -31,9 +33,11 @@ import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.LikePredicate;
 import org.apache.impala.builtins.ScalarBuiltins;
+import org.apache.impala.catalog.AggregateFunction;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
 
 public class BuiltinsDb extends Db {
   // Size in bytes of AvgState used for integer, floating point, and timestamp avg().
@@ -49,10 +53,20 @@ public class BuiltinsDb extends Db {
   // Must match PC_INTERMEDIATE_BYTES in aggregate-functions-ir.cc.
   private static final int PC_INTERMEDIATE_SIZE = 256;
 
-  // Size in bytes of Hyperloglog intermediate value used for ndv().
-  // Must match HLL_LEN in aggregate-functions-ir.cc.
+  // Size in bytes of the default Hyperloglog intermediate value used for ndv().
+  // Must match DEFAULT_HLL_LEN in aggregate-functions-ir.cc.
   private static final int HLL_INTERMEDIATE_SIZE = 1024;
 
+  // Sizes in bytes of all supported HyperLogLog intermediate value used for NDV(),
+  // corresponding to precision 9, 10, 11, 12, 13, 14, 15, 16, 17 and 18, respectively
+  private static final int[] hll_intermediate_sizes = {
+      512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144};
+
+  // A map of the AggregateFunction templates with all known/supported
+  // intermediate data types.
+  private final Map<Type, List<AggregateFunction>> builtinNDVs_ =
+      new HashMap<>();
+
   // Size in bytes of RankState used for rank() and dense_rank().
   private static final int RANK_INTERMEDIATE_SIZE = 16;
 
@@ -336,6 +350,33 @@ public class BuiltinsDb extends Db {
             "9HllUpdateIN10impala_udf7DateValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
         .build();
 
+  // For ndv() with two input arguments
+  private static final Map<Type, String> HLL_UPDATE_SYMBOL_WITH_PRECISION =
+      ImmutableMap.<Type, String>builder()
+        .put(Type.BOOLEAN,
+            "9HllUpdateIN10impala_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.TINYINT,
+             "9HllUpdateIN10impala_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.SMALLINT,
+             "9HllUpdateIN10impala_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.INT,
+             "9HllUpdateIN10impala_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_PNS2_9StringValE")
+        .put(Type.BIGINT,
+             "9HllUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.FLOAT,
+             "9HllUpdateIN10impala_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.DOUBLE,
+             "9HllUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.STRING,
+             "9HllUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
+        .put(Type.TIMESTAMP,
+             "9HllUpdateIN10impala_udf12TimestampValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.DECIMAL,
+             "9HllUpdateIN10impala_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .put(Type.DATE,
+             "9HllUpdateIN10impala_udf7DateValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+        .build();
+
   private static final Map<Type, String> SAMPLED_NDV_UPDATE_SYMBOL =
       ImmutableMap.<Type, String>builder()
         .put(Type.BOOLEAN,
@@ -845,6 +886,23 @@ public class BuiltinsDb extends Db {
             "25FirstValIgnoreNullsUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS6_")
         .build();
 
+  // A helper function to return a template aggregation function for NDV with 2 arguments,
+  // for data type t. Inputs:
+  //  db: the db;
+  //  prefix: the prefix string for each function call symbol in BE;
+  //  t: the type to build the template for;
+  //  hllIntermediateType: the intermediate data type.
+  private static AggregateFunction createTemplateAggregateFunctionForNDVWith2Args(
+      Db db, String prefix, Type t, Type hllIntermediateType) {
+    return AggregateFunction.createBuiltin(db, "ndv", Lists.newArrayList(t, Type.INT),
+        Type.BIGINT, hllIntermediateType,
+        prefix + "7HllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix + HLL_UPDATE_SYMBOL_WITH_PRECISION.get(t),
+        prefix + "8HllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", null,
+        prefix + "11HllFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE", true,
+        false, true);
+  }
+
   // Populate all the aggregate builtins in the catalog.
   // null symbols indicate the function does not need that step of the evaluation.
   // An empty symbol indicates a TODO for the BE to implement the function.
@@ -934,10 +992,15 @@ public class BuiltinsDb extends Db {
           false, false, true));
 
       // NDV
-      Type hllIntermediateType =
+      // Setup the intermediate type based on the default precision in the template
+      // function in the db. This type is useful when the default precision is all
+      // needed in the ndv().
+      Type defaultHllIntermediateType =
           ScalarType.createFixedUdaIntermediateType(HLL_INTERMEDIATE_SIZE);
-      db.addBuiltin(AggregateFunction.createBuiltin(db, "ndv",
-          Lists.newArrayList(t), Type.BIGINT, hllIntermediateType,
+
+      // Single input argument version
+      db.addBuiltin(AggregateFunction.createBuiltin(db, "ndv", Lists.newArrayList(t),
+          Type.BIGINT, defaultHllIntermediateType,
           prefix + "7HllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
           prefix + HLL_UPDATE_SYMBOL.get(t),
           prefix + "8HllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
@@ -945,8 +1008,26 @@ public class BuiltinsDb extends Db {
           prefix + "11HllFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
           true, false, true));
 
+      // Double input argument version, with the unique HllUpdate function symbols.
+      // Take an intermediate data type with the default length for now. During the
+      // analysis phase, the data type will be resolved to the correct template, based
+      // on the the value in the 2nd argument.
+      db.addBuiltin(createTemplateAggregateFunctionForNDVWith2Args(
+          db, prefix, t, defaultHllIntermediateType));
+
+      // For each type t, populate the hash map of AggregateFunctions with
+      // all known intermediate data types.
+      List<AggregateFunction> ndvList = new ArrayList<AggregateFunction>();
+      for (int size : hll_intermediate_sizes) {
+        Type hllIntermediateType = ScalarType.createFixedUdaIntermediateType(size);
+        ndvList.add(createTemplateAggregateFunctionForNDVWith2Args(
+            db, prefix, t, hllIntermediateType));
+      }
+      builtinNDVs_.put(t, ndvList);
+
+      // Used in stats computation. Will take a single input argument only.
       db.addBuiltin(AggregateFunction.createBuiltin(db, "ndv_no_finalize",
-          Lists.newArrayList(t), Type.STRING, hllIntermediateType,
+          Lists.newArrayList(t), Type.STRING, defaultHllIntermediateType,
           prefix + "7HllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
           prefix + HLL_UPDATE_SYMBOL.get(t),
           prefix + "8HllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
@@ -1280,4 +1361,17 @@ public class BuiltinsDb extends Db {
             db, "lead", Lists.newArrayList(t, Type.BIGINT), t, t));
     }
   }
+
+  // Resolve the intermediate data type by searching for one in the
+  // map builtinNDVs_ that has the desired length.
+  public AggregateFunction resolveNdvIntermediateType(
+      AggregateFunction func, int length) {
+    Preconditions.checkState(func.getNumArgs() >= 1);
+    List<AggregateFunction> list = builtinNDVs_.get(func.getArgs()[0]);
+    for (AggregateFunction aggF : list) {
+      ScalarType sType = (ScalarType) aggF.getIntermediateType();
+      if (sType.getLength() == length) return aggF;
+    }
+    return null;
+  }
 }
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 10d63ec..8106145 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -277,6 +277,49 @@ class TestAggregationQueries(ImpalaTestSuite):
     vector.get_value('exec_option')['batch_size'] = 1
     self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
 
+  def test_ndv(self):
+    """Test the version of NDV() that accepts a scale value argument against
+    different column data types. The scale argument is an integer in range
+    [1, 10]."""
+
+    ndv_results = [
+      [2, 9, 96, 988, 980, 1000, 944, 1030, 1020, 990, 1010, 957, 1030, 1027, 9845, 9898],
+      [2, 9, 97, 957, 1008, 1016, 1005, 963, 994, 993, 1018, 1004, 963, 1014, 10210,
+          10280],
+      [2, 9, 98, 977, 1024, 1020, 975, 977, 1002, 991, 994, 1006, 977, 999, 10118, 9923],
+      [2, 9, 99, 986, 1009, 1011, 994, 980, 997, 994, 1002, 997, 980, 988, 10148, 9987],
+      [2, 9, 99, 995, 996, 1000, 998, 988, 995, 999, 997, 999, 988, 979, 9974, 9960],
+      [2, 9, 99, 998, 1005, 999, 1003, 994, 1000, 993, 999, 998, 994, 992, 9899, 9941],
+      [2, 9, 99, 993, 1001, 1007, 1000, 998, 1002, 997, 999, 998, 998, 999, 9923, 9931],
+      [2, 9, 99, 994, 998, 1002, 1002, 999, 998, 999, 997, 1000, 999, 997, 9937, 9973],
+      [2, 9, 99, 995, 997, 998, 1001, 999, 1001, 996, 997, 1000, 999, 998, 9989, 9981],
+      [2, 9, 99, 998, 998, 997, 999, 998, 1000, 998, 1000, 998, 998, 1000, 10000, 10003]
+    ]
+
+    # For each possible integer value, genereate one query and test it out.
+    for i in xrange(1, 11):
+      ndv_stmt = """
+        select ndv(bool_col, {0}), ndv(tinyint_col, {0}),
+               ndv(smallint_col, {0}), ndv(int_col, {0}),
+               ndv(bigint_col, {0}), ndv(float_col, {0}),
+               ndv(double_col, {0}), ndv(string_col, {0}),
+               ndv(cast(double_col as decimal(5, 0)), {0}),
+               ndv(cast(double_col as decimal(10, 5)), {0}),
+               ndv(cast(double_col as decimal(20, 10)), {0}),
+               ndv(cast(double_col as decimal(38, 33)), {0}),
+               ndv(cast(string_col as varchar(20)), {0}),
+               ndv(cast(string_col as char(10)), {0}),
+               ndv(timestamp_col, {0}), ndv(id, {0})
+        from functional_parquet.alltypesagg""".format(i)
+      ndv_result = self.execute_query(ndv_stmt)
+      ndv_vals = ndv_result.data[0].split('\t')
+
+      # Verify that each ndv() value (one per column for a total of 11) is identical
+      # to the corresponding known value. Since NDV() invokes Hash64() hash function
+      # with a fixed seed value, ndv() result is deterministic.
+      for j in xrange(0, 11):
+        assert(ndv_results[i - 1][j] == int(ndv_vals[j]))
+
   def test_sampled_ndv(self, vector, unique_database):
     """The SAMPLED_NDV() function is inherently non-deterministic and cannot be
     reasonably made deterministic with existing options so we test it separately.