You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2017/07/15 08:07:00 UTC

[21/24] hive git commit: HIVE-16996: Add HLL as an alternative to FM sketch to compute stats (Pengcheng Xiong, reviewed by Ashutosh Chauhan, Prasanth Jayachandran)

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/NumDistinctValueEstimator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/NumDistinctValueEstimator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/NumDistinctValueEstimator.java
deleted file mode 100644
index 92f9a84..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/NumDistinctValueEstimator.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-import java.util.Random;
-
-import javolution.util.FastBitSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.io.Text;
-
-/*
- * https://en.wikipedia.org/wiki/Flajolet%E2%80%93Martin_algorithm
- * We implement Flajolet–Martin algorithm in this class.
- * The Flajolet–Martin algorithm is an algorithm for approximating the number of distinct elements 
- * in a stream with a single pass and space-consumption which is logarithmic in the maximum number 
- * of possible distinct elements in the stream. The algorithm was introduced by Philippe Flajolet 
- * and G. Nigel Martin in their 1984 paper "Probabilistic Counting Algorithms for Data Base Applications".
- * Later it has been refined in the papers "LogLog counting of large cardinalities" by Marianne Durand 
- * and Philippe Flajolet, and "HyperLogLog: The analysis of a near-optimal cardinality estimation 
- * algorithm" by Philippe Flajolet et al.
- */
-
-/*
- * The algorithm works like this.
- * (1) Set the number of bit vectors, i.e., numBitVectors, based on the precision.
- * (2) For each bit vector, generate hash value of the long value and mod it by 2^bitVectorSize-1. (addToEstimator)
- * (3) Set the index (addToEstimator)
- * (4) Take the average of the index for all the bit vectors and get the estimated NDV (estimateNumDistinctValues).
- */
-public class NumDistinctValueEstimator {
-
-  static final Log LOG = LogFactory.getLog(NumDistinctValueEstimator.class.getName());
-
-  /* We want a,b,x to come from a finite field of size 0 to k, where k is a prime number.
-   * 2^p - 1 is prime for p = 31. Hence bitvectorSize has to be 31. Pick k to be 2^p -1.
-   * If a,b,x didn't come from a finite field ax1 + b mod k and ax2 + b mod k will not be pair wise
-   * independent. As a consequence, the hash values will not distribute uniformly from 0 to 2^p-1
-   * thus introducing errors in the estimates.
-   */
-  private static final int BIT_VECTOR_SIZE = 31;
-  private final int numBitVectors;
-
-  // Refer to Flajolet-Martin'86 for the value of phi
-  private static final double PHI = 0.77351;
-
-  private final int[] a;
-  private final int[] b;
-  private final FastBitSet[] bitVector;
-
-  private final Random aValue;
-  private final Random bValue;
-
-  /* Create a new distinctValueEstimator
-   */
-  public NumDistinctValueEstimator(int numBitVectors) {
-    this.numBitVectors = numBitVectors;
-    bitVector = new FastBitSet[numBitVectors];
-    for (int i=0; i< numBitVectors; i++) {
-      bitVector[i] = new FastBitSet(BIT_VECTOR_SIZE);
-    }
-
-    a = new int[numBitVectors];
-    b = new int[numBitVectors];
-
-    /* Use a large prime number as a seed to the random number generator.
-     * Java's random number generator uses the Linear Congruential Generator to generate random
-     * numbers using the following recurrence relation,
-     *
-     * X(n+1) = (a X(n) + c ) mod m
-     *
-     *  where X0 is the seed. Java implementation uses m = 2^48. This is problematic because 2^48
-     *  is not a prime number and hence the set of numbers from 0 to m don't form a finite field.
-     *  If these numbers don't come from a finite field any give X(n) and X(n+1) may not be pair
-     *  wise independent.
-     *
-     *  However, empirically passing in prime numbers as seeds seems to work better than when passing
-     *  composite numbers as seeds. Ideally Java's Random should pick m such that m is prime.
-     *
-     */
-    aValue = new Random(99397);
-    bValue = new Random(9876413);
-
-    for (int i = 0; i < numBitVectors; i++) {
-      int randVal;
-      /* a and b shouldn't be even; If a and b are even, then none of the values
-       * will set bit 0 thus introducing errors in the estimate. Both a and b can be even
-       * 25% of the times and as a result 25% of the bit vectors could be inaccurate. To avoid this
-       * always pick odd values for a and b.
-       */
-      do {
-        randVal = aValue.nextInt();
-      } while (randVal % 2 == 0);
-
-      a[i] = randVal;
-
-      do {
-        randVal = bValue.nextInt();
-      } while (randVal % 2 == 0);
-
-      b[i] = randVal;
-
-      if (a[i] < 0) {
-        a[i] = a[i] + (1 << BIT_VECTOR_SIZE - 1);
-      }
-
-      if (b[i] < 0) {
-        b[i] = b[i] + (1 << BIT_VECTOR_SIZE - 1);
-      }
-    }
-  }
-
-  public NumDistinctValueEstimator(String s, int numBitVectors) {
-    this.numBitVectors = numBitVectors;
-    FastBitSet bitVectorDeser[] = deserialize(s, numBitVectors);
-    bitVector = new FastBitSet[numBitVectors];
-    for(int i=0; i <numBitVectors; i++) {
-       bitVector[i] = new FastBitSet(BIT_VECTOR_SIZE);
-       bitVector[i].clear();
-       bitVector[i].or(bitVectorDeser[i]);
-    }
-
-    a = null;
-    b = null;
-
-    aValue = null;
-    bValue = null;
-  }
-
-  /**
-   * Resets a distinctValueEstimator object to its original state.
-   */
-  public void reset() {
-    for (int i=0; i< numBitVectors; i++) {
-      bitVector[i].clear();
-    }
-  }
-
-  public FastBitSet getBitVector(int index) {
-    return bitVector[index];
-  }
-
-  public int getnumBitVectors() {
-    return numBitVectors;
-  }
-
-  public int getBitVectorSize() {
-    return BIT_VECTOR_SIZE;
-  }
-
-  public void printNumDistinctValueEstimator() {
-    String t = new String();
-
-    LOG.debug("NumDistinctValueEstimator");
-    LOG.debug("Number of Vectors:");
-    LOG.debug(numBitVectors);
-    LOG.debug("Vector Size: ");
-    LOG.debug(BIT_VECTOR_SIZE);
-
-    for (int i=0; i < numBitVectors; i++) {
-      t = t + bitVector[i].toString();
-    }
-
-    LOG.debug("Serialized Vectors: ");
-    LOG.debug(t);
-  }
-
-  /* Serializes a distinctValueEstimator object to Text for transport.
-   *
-   */
-  public Text serialize() {
-    String s = new String();
-    for(int i=0; i < numBitVectors; i++) {
-      s = s + (bitVector[i].toString());
-    }
-    return new Text(s);
-  }
-
-  /* Deserializes from string to FastBitSet; Creates a NumDistinctValueEstimator object and
-   * returns it.
-   */
-
-  private FastBitSet[] deserialize(String s, int numBitVectors) {
-    FastBitSet[] b = new FastBitSet[numBitVectors];
-    for (int j=0; j < numBitVectors; j++) {
-      b[j] = new FastBitSet(BIT_VECTOR_SIZE);
-      b[j].clear();
-    }
-
-    int vectorIndex =0;
-
-    /* Parse input string to obtain the indexes that are set in the bitvector.
-     * When a toString() is called on a FastBitSet object to serialize it, the serialization
-     * adds { and } to the beginning and end of the return String.
-     * Skip "{", "}", ",", " " in the input string.
-     */
-    for(int i=1; i < s.length()-1;) {
-      char c = s.charAt(i);
-      i = i + 1;
-
-      // Move on to the next bit vector
-      if (c == '}') {
-         vectorIndex = vectorIndex + 1;
-      }
-
-      // Encountered a numeric value; Extract out the entire number
-      if (c >= '0' && c <= '9') {
-        String t = new String();
-        t = t + c;
-        c = s.charAt(i);
-        i = i + 1;
-
-        while (c != ',' && c!= '}') {
-          t = t + c;
-          c = s.charAt(i);
-          i = i + 1;
-        }
-
-        int bitIndex = Integer.parseInt(t);
-        assert(bitIndex >= 0);
-        assert(vectorIndex < numBitVectors);
-        b[vectorIndex].set(bitIndex);
-        if (c == '}') {
-          vectorIndex =  vectorIndex + 1;
-        }
-      }
-    }
-    return b;
-  }
-
-  private int generateHash(long v, int hashNum) {
-    int mod = (1<<BIT_VECTOR_SIZE) - 1;
-    long tempHash = a[hashNum] * v  + b[hashNum];
-    tempHash %= mod;
-    int hash = (int) tempHash;
-
-    /* Hash function should map the long value to 0...2^L-1.
-     * Hence hash value has to be non-negative.
-     */
-    if (hash < 0) {
-      hash = hash + mod;
-    }
-    return hash;
-  }
-
-  private int generateHashForPCSA(long v) {
-    return generateHash(v, 0);
-  }
-
-  public void addToEstimator(long v) {
-    /* Update summary bitVector :
-     * Generate hash value of the long value and mod it by 2^bitVectorSize-1.
-     * In this implementation bitVectorSize is 31.
-     */
-
-    for (int i = 0; i<numBitVectors; i++) {
-      int hash = generateHash(v,i);
-      int index;
-
-      // Find the index of the least significant bit that is 1
-      for (index=0; index<BIT_VECTOR_SIZE; index++) {
-        if (hash % 2 != 0) {
-          break;
-        }
-        hash = hash >> 1;
-      }
-
-      // Set bitvector[index] := 1
-      bitVector[i].set(index);
-    }
-  }
-
-  public void addToEstimatorPCSA(long v) {
-    int hash = generateHashForPCSA(v);
-    int rho = hash/numBitVectors;
-    int index;
-
-    // Find the index of the least significant bit that is 1
-    for (index=0; index<BIT_VECTOR_SIZE; index++) {
-      if (rho % 2 != 0) {
-        break;
-      }
-      rho = rho >> 1;
-    }
-
-    // Set bitvector[index] := 1
-    bitVector[hash%numBitVectors].set(index);
-  }
-
-  public void addToEstimator(double d) {
-    int v = new Double(d).hashCode();
-    addToEstimator(v);
-  }
-
-  public void addToEstimatorPCSA(double d) {
-    int v = new Double(d).hashCode();
-    addToEstimatorPCSA(v);
-  }
-
-  public void addToEstimator(HiveDecimal decimal) {
-    int v = decimal.hashCode();
-    addToEstimator(v);
-  }
-
-  public void addToEstimatorPCSA(HiveDecimal decimal) {
-    int v = decimal.hashCode();
-    addToEstimatorPCSA(v);
-  }
-
-  public void mergeEstimators(NumDistinctValueEstimator o) {
-    // Bitwise OR the bitvector with the bitvector in the agg buffer
-    for (int i=0; i<numBitVectors; i++) {
-      bitVector[i].or(o.getBitVector(i));
-    }
-  }
-
-  public long estimateNumDistinctValuesPCSA() {
-    double numDistinctValues = 0.0;
-    long S = 0;
-
-    for (int i=0; i < numBitVectors; i++) {
-      int index = 0;
-      while (bitVector[i].get(index) && index < BIT_VECTOR_SIZE) {
-        index = index + 1;
-      }
-      S = S + index;
-    }
-
-    numDistinctValues = ((numBitVectors/PHI) * Math.pow(2.0, S/numBitVectors));
-    return ((long)numDistinctValues);
-  }
-
-  /* We use the Flajolet-Martin estimator to estimate the number of distinct values.FM uses the
-   * location of the least significant zero as an estimate of log2(phi*ndvs).
-   */
-  public long estimateNumDistinctValues() {
-    int sumLeastSigZero = 0;
-    double avgLeastSigZero;
-    double numDistinctValues;
-
-    for (int i=0; i< numBitVectors; i++) {
-      int leastSigZero = bitVector[i].nextClearBit(0);
-      sumLeastSigZero += leastSigZero;
-    }
-
-    avgLeastSigZero =
-        (double)(sumLeastSigZero/(numBitVectors * 1.0)) - (Math.log(PHI)/Math.log(2.0));
-    numDistinctValues = Math.pow(2.0, avgLeastSigZero);
-    return ((long)(numDistinctValues));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
index 18f8afc..0e11989 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
@@ -84,7 +84,6 @@ class StatsCache {
         .build(new CacheLoader<StatsCacheKey, AggrStats>() {
           @Override
           public AggrStats load(StatsCacheKey key) throws Exception {
-            int numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
             boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
             HBaseReadWrite hrw = HBaseReadWrite.getInstance();
             AggrStats aggrStats = hrw.getAggregatedStats(key.hashed);
@@ -101,7 +100,7 @@ class StatsCache {
                 if (aggregator == null) {
                   aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css.iterator()
                       .next().getStatsObj().iterator().next().getStatsData().getSetField(),
-                      numBitVectors, useDensityFunctionForNDVEstimation);
+                      useDensityFunctionForNDVEstimation);
                 }
                 ColumnStatisticsObj statsObj = aggregator
                     .aggregate(key.colName, key.partNames, css);

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
index 31955b4..29a0539 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 
 public abstract class ColumnStatsAggregator {
-  public int numBitVectors;
   public boolean useDensityFunctionForNDVEstimation;
 
   public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames,

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
index daf8569..568bf06 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
@@ -34,7 +34,7 @@ public class ColumnStatsAggregatorFactory {
   private ColumnStatsAggregatorFactory() {
   }
 
-  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors, boolean useDensityFunctionForNDVEstimation) {
+  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, boolean useDensityFunctionForNDVEstimation) {
     ColumnStatsAggregator agg;
     switch (type) {
     case BOOLEAN_STATS:
@@ -58,7 +58,6 @@ public class ColumnStatsAggregatorFactory {
     default:
       throw new RuntimeException("Woh, bad.  Unknown stats type " + type.toString());
     }
-    agg.numBitVectors = numBitVectors;
     agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation;
     return agg;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
index 36b2c9c..8eb64e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
@@ -26,7 +26,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
@@ -46,7 +47,7 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
     // check if all the ColumnStatisticsObjs contain stats and all the ndv are
     // bitvectors
     boolean doAllPartitionContainStats = partNames.size() == css.size();
-    boolean isNDVBitVectorSet = true;
+    NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
       if (cs.getStatsObjSize() != 1) {
@@ -60,22 +61,36 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
         statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
             .getStatsData().getSetField());
       }
-      if (numBitVectors <= 0 || !cso.getStatsData().getDecimalStats().isSetBitVectors()
+      if (!cso.getStatsData().getDecimalStats().isSetBitVectors()
           || cso.getStatsData().getDecimalStats().getBitVectors().length() == 0) {
-        isNDVBitVectorSet = false;
+        ndvEstimator = null;
         break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(cso.getStatsData().getDecimalStats().getBitVectors());
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
       }
     }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
     if (doAllPartitionContainStats || css.size() < 2) {
       DecimalColumnStatsData aggregateData = null;
       long lowerBound = 0;
       long higherBound = 0;
       double densityAvgSum = 0.0;
-      NumDistinctValueEstimator ndvEstimator = null;
-      if (isNDVBitVectorSet) {
-        ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
-      }
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
@@ -85,9 +100,9 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
           densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
               .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
         }
-        if (isNDVBitVectorSet) {
-          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-              ndvEstimator.getnumBitVectors()));
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
         }
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
@@ -108,7 +123,7 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
           aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
-      if (isNDVBitVectorSet) {
+      if (ndvEstimator != null) {
         // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
         // use uniform distribution assumption because we can merge bitvectors
         // to get a good estimation.
@@ -145,7 +160,7 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
       // while we scan the css, we also get the densityAvg, lowerbound and
       // higerbound when useDensityFunctionForNDVEstimation is true.
       double densityAvgSum = 0.0;
-      if (!isNDVBitVectorSet) {
+      if (ndvEstimator == null) {
         // if not every partition uses bitvector for ndv, we just fall back to
         // the traditional extrapolation methods.
         for (ColumnStatistics cs : css) {
@@ -162,7 +177,6 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
       } else {
         // we first merge all the adjacent bitvectors that we could merge and
         // derive new partition names and index.
-        NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
         StringBuilder pseudoPartName = new StringBuilder();
         double pseudoIndexSum = 0;
         int length = 0;
@@ -191,6 +205,7 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
               pseudoPartName = new StringBuilder();
               pseudoIndexSum = 0;
               length = 0;
+              ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
             }
             aggregateData = null;
           }
@@ -216,8 +231,8 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
             }
             aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
           }
-          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-              ndvEstimator.getnumBitVectors()));
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
         }
         if (length > 0) {
           // we have to set ndv

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
index a88ef84..b6b8612 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
@@ -26,7 +26,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -44,7 +45,7 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
     // check if all the ColumnStatisticsObjs contain stats and all the ndv are
     // bitvectors
     boolean doAllPartitionContainStats = partNames.size() == css.size();
-    boolean isNDVBitVectorSet = true;
+    NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
       if (cs.getStatsObjSize() != 1) {
@@ -58,22 +59,36 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
         statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
             .getStatsData().getSetField());
       }
-      if (numBitVectors <= 0 || !cso.getStatsData().getDoubleStats().isSetBitVectors()
+      if (!cso.getStatsData().getDoubleStats().isSetBitVectors()
           || cso.getStatsData().getDoubleStats().getBitVectors().length() == 0) {
-        isNDVBitVectorSet = false;
+        ndvEstimator = null;
         break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(cso.getStatsData().getDoubleStats().getBitVectors());
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
       }
     }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
     if (doAllPartitionContainStats || css.size() < 2) {
       DoubleColumnStatsData aggregateData = null;
       long lowerBound = 0;
       long higherBound = 0;
       double densityAvgSum = 0.0;
-      NumDistinctValueEstimator ndvEstimator = null;
-      if (isNDVBitVectorSet) {
-        ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
-      }
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
@@ -82,9 +97,9 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
           higherBound += newData.getNumDVs();
           densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
         }
-        if (isNDVBitVectorSet) {
-          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-              ndvEstimator.getnumBitVectors()));
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
         }
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
@@ -96,7 +111,7 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
           aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
-      if (isNDVBitVectorSet) {
+      if (ndvEstimator != null) {
         // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
         // use uniform distribution assumption because we can merge bitvectors
         // to get a good estimation.
@@ -132,7 +147,7 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
       // while we scan the css, we also get the densityAvg, lowerbound and
       // higerbound when useDensityFunctionForNDVEstimation is true.
       double densityAvgSum = 0.0;
-      if (!isNDVBitVectorSet) {
+      if (ndvEstimator == null) {
         // if not every partition uses bitvector for ndv, we just fall back to
         // the traditional extrapolation methods.
         for (ColumnStatistics cs : css) {
@@ -148,7 +163,6 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
       } else {
         // we first merge all the adjacent bitvectors that we could merge and
         // derive new partition names and index.
-        NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
         StringBuilder pseudoPartName = new StringBuilder();
         double pseudoIndexSum = 0;
         int length = 0;
@@ -176,6 +190,7 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
               pseudoPartName = new StringBuilder();
               pseudoIndexSum = 0;
               length = 0;
+              ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
             }
             aggregateData = null;
           }
@@ -192,8 +207,8 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
                 newData.getHighValue()));
             aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
           }
-          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-              ndvEstimator.getnumBitVectors()));
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
         }
         if (length > 0) {
           // we have to set ndv

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
index 8ac6561..2da6f60 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
@@ -26,7 +26,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -44,7 +45,7 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
     // check if all the ColumnStatisticsObjs contain stats and all the ndv are
     // bitvectors
     boolean doAllPartitionContainStats = partNames.size() == css.size();
-    boolean isNDVBitVectorSet = true;
+    NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
       if (cs.getStatsObjSize() != 1) {
@@ -58,22 +59,36 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
         statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
             .getStatsData().getSetField());
       }
-      if (numBitVectors <= 0 || !cso.getStatsData().getLongStats().isSetBitVectors()
+      if (!cso.getStatsData().getLongStats().isSetBitVectors()
           || cso.getStatsData().getLongStats().getBitVectors().length() == 0) {
-        isNDVBitVectorSet = false;
+        ndvEstimator = null;
         break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(cso.getStatsData().getLongStats().getBitVectors());
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
       }
     }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
     if (doAllPartitionContainStats || css.size() < 2) {
       LongColumnStatsData aggregateData = null;
       long lowerBound = 0;
       long higherBound = 0;
       double densityAvgSum = 0.0;
-      NumDistinctValueEstimator ndvEstimator = null;
-      if (isNDVBitVectorSet) {
-        ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
-      }
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         LongColumnStatsData newData = cso.getStatsData().getLongStats();
@@ -82,9 +97,9 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
           higherBound += newData.getNumDVs();
           densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
         }
-        if (isNDVBitVectorSet) {
-          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-              ndvEstimator.getnumBitVectors()));
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
         }
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
@@ -96,7 +111,7 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
           aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
-      if (isNDVBitVectorSet) {
+      if (ndvEstimator != null) {
         // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
         // use uniform distribution assumption because we can merge bitvectors
         // to get a good estimation.
@@ -132,7 +147,7 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
       // while we scan the css, we also get the densityAvg, lowerbound and
       // higerbound when useDensityFunctionForNDVEstimation is true.
       double densityAvgSum = 0.0;
-      if (!isNDVBitVectorSet) {
+      if (ndvEstimator == null) {
         // if not every partition uses bitvector for ndv, we just fall back to
         // the traditional extrapolation methods.
         for (ColumnStatistics cs : css) {
@@ -148,7 +163,6 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
       } else {
         // we first merge all the adjacent bitvectors that we could merge and
         // derive new partition names and index.
-        NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
         StringBuilder pseudoPartName = new StringBuilder();
         double pseudoIndexSum = 0;
         int length = 0;
@@ -176,6 +190,7 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
               pseudoPartName = new StringBuilder();
               pseudoIndexSum = 0;
               length = 0;
+              ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
             }
             aggregateData = null;
           }
@@ -192,8 +207,8 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
                 newData.getHighValue()));
             aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
           }
-          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-              ndvEstimator.getnumBitVectors()));
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
         }
         if (length > 0) {
           // we have to set ndv

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
index 2aa4046..83c6c54 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
@@ -21,7 +21,8 @@ package org.apache.hadoop.hive.metastore.hbase.stats;
 
 import java.util.List;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -39,7 +40,7 @@ public class StringColumnStatsAggregator extends ColumnStatsAggregator {
     // bitvectors. Only when both of the conditions are true, we merge bit
     // vectors. Otherwise, just use the maximum function.
     boolean doAllPartitionContainStats = partNames.size() == css.size();
-    boolean isNDVBitVectorSet = true;
+    NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
       if (cs.getStatsObjSize() != 1) {
@@ -53,21 +54,37 @@ public class StringColumnStatsAggregator extends ColumnStatsAggregator {
         statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
             .getStatsData().getSetField());
       }
-      if (numBitVectors <= 0 || !cso.getStatsData().getStringStats().isSetBitVectors()
+      if (!cso.getStatsData().getStringStats().isSetBitVectors()
           || cso.getStatsData().getStringStats().getBitVectors().length() == 0) {
-        isNDVBitVectorSet = false;
+        ndvEstimator = null;
         break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(cso.getStatsData().getStringStats().getBitVectors());
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
       }
     }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
-    if (doAllPartitionContainStats && isNDVBitVectorSet) {
+    if (doAllPartitionContainStats && ndvEstimator!=null) {
       StringColumnStatsData aggregateData = null;
-      NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         StringColumnStatsData newData = cso.getStatsData().getStringStats();
-        ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-            ndvEstimator.getnumBitVectors()));
+        ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(newData.getBitVectors()));
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
         } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
index 33c7e3e..d3051a2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,8 +26,6 @@ import org.slf4j.LoggerFactory;
 public abstract class ColumnStatsMerger {
   protected final Logger LOG = LoggerFactory.getLogger(ColumnStatsMerger.class.getName());
 
-  NumDistinctValueEstimator ndvEstimator = null;
-
   public abstract void merge(ColumnStatisticsObj aggregateColStats,
       ColumnStatisticsObj newColStats);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
index fe890e4..c013ba5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
@@ -20,7 +20,8 @@
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
@@ -37,15 +38,6 @@ public class ColumnStatsMergerFactory {
   private ColumnStatsMergerFactory() {
   }
 
-  // we depend on the toString() method for javolution.util.FastCollection.
-  private static int countNumBitVectors(String s) {
-    if (s != null) {
-      return StringUtils.countMatches(s, "{");
-    } else {
-      return 0;
-    }
-  }
-
   public static ColumnStatsMerger getColumnStatsMerger(ColumnStatisticsObj statsObjNew,
       ColumnStatisticsObj statsObjOld) {
     ColumnStatsMerger agg;
@@ -53,30 +45,20 @@ public class ColumnStatsMergerFactory {
     _Fields typeOld = statsObjOld.getStatsData().getSetField();
     // make sure that they have the same type
     typeNew = typeNew == typeOld ? typeNew : null;
-    int numBitVectors = 0;
     switch (typeNew) {
     case BOOLEAN_STATS:
       agg = new BooleanColumnStatsMerger();
       break;
     case LONG_STATS: {
       agg = new LongColumnStatsMerger();
-      int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getLongStats().getBitVectors());
-      int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getLongStats().getBitVectors());
-      numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
       break;
     }
     case DOUBLE_STATS: {
       agg = new DoubleColumnStatsMerger();
-      int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDoubleStats().getBitVectors());
-      int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDoubleStats().getBitVectors());
-      numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
       break;
     }
     case STRING_STATS: {
       agg = new StringColumnStatsMerger();
-      int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getStringStats().getBitVectors());
-      int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getStringStats().getBitVectors());
-      numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
       break;
     }
     case BINARY_STATS:
@@ -84,24 +66,15 @@ public class ColumnStatsMergerFactory {
       break;
     case DECIMAL_STATS: {
       agg = new DecimalColumnStatsMerger();
-      int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDecimalStats().getBitVectors());
-      int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDecimalStats().getBitVectors());
-      numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
       break;
     }
     case DATE_STATS: {
       agg = new DateColumnStatsMerger();
-      int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDateStats().getBitVectors());
-      int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDateStats().getBitVectors());
-      numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
       break;
     }
     default:
       throw new IllegalArgumentException("Unknown stats type " + typeNew.toString());
     }
-    if (numBitVectors > 0) {
-      agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
-    }
     return agg;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
index 3179b23..e899bfe 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
@@ -19,7 +19,8 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Date;
 import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
@@ -29,27 +30,32 @@ public class DateColumnStatsMerger extends ColumnStatsMerger {
   public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
     DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats();
     DateColumnStatsData newData = newColStats.getStatsData().getDateStats();
-    Date lowValue =
-        aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
-            .getLowValue() : newData.getLowValue();
+    Date lowValue = aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
+        .getLowValue() : newData.getLowValue();
     aggregateData.setLowValue(lowValue);
-    Date highValue =
-        aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData
-            .getHighValue() : newData.getHighValue();
+    Date highValue = aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData
+        .getHighValue() : newData.getHighValue();
     aggregateData.setHighValue(highValue);
     aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+    if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+        || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
       aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      long ndv = ndvEstimator.estimateNumDistinctValues();
+      NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+      NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(newData.getBitVectors());
+      long ndv = -1;
+      if (oldEst.canMerge(newEst)) {
+        oldEst.mergeEstimators(newEst);
+        ndv = oldEst.estimateNumDistinctValues();
+        aggregateData.setBitVectors(oldEst.serialize());
+      } else {
+        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+      }
       LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
           + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
       aggregateData.setNumDVs(ndv);
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
index c13add9..4099ffc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
@@ -19,7 +19,8 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
@@ -38,18 +39,25 @@ public class DecimalColumnStatsMerger extends ColumnStatsMerger {
         .getHighValue() : newData.getHighValue();
     aggregateData.setHighValue(highValue);
     aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+    if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+        || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
       aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      long ndv = ndvEstimator.estimateNumDistinctValues();
+      NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+      NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(newData.getBitVectors());
+      long ndv = -1;
+      if (oldEst.canMerge(newEst)) {
+        oldEst.mergeEstimators(newEst);
+        ndv = oldEst.estimateNumDistinctValues();
+        aggregateData.setBitVectors(oldEst.serialize());
+      } else {
+        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+      }
       LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
           + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
       aggregateData.setNumDVs(ndv);
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
index fbdba24..1691fc9 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
@@ -19,7 +19,8 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 
@@ -31,18 +32,25 @@ public class DoubleColumnStatsMerger extends ColumnStatsMerger {
     aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
     aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
     aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+    if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+        || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
       aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      long ndv = ndvEstimator.estimateNumDistinctValues();
+      NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+      NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(newData.getBitVectors());
+      long ndv = -1;
+      if (oldEst.canMerge(newEst)) {
+        oldEst.mergeEstimators(newEst);
+        ndv = oldEst.estimateNumDistinctValues();
+        aggregateData.setBitVectors(oldEst.serialize());
+      } else {
+        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+      }
       LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
           + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
       aggregateData.setNumDVs(ndv);
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
index ac65590..361af35 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
@@ -19,7 +19,8 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 
@@ -31,18 +32,25 @@ public class LongColumnStatsMerger extends ColumnStatsMerger {
     aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
     aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
     aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+    if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+        || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
       aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      long ndv = ndvEstimator.estimateNumDistinctValues();
+      NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+      NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(newData.getBitVectors());
+      long ndv = -1;
+      if (oldEst.canMerge(newEst)) {
+        oldEst.mergeEstimators(newEst);
+        ndv = oldEst.estimateNumDistinctValues();
+        aggregateData.setBitVectors(oldEst.serialize());
+      } else {
+        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+      }
       LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
           + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
       aggregateData.setNumDVs(ndv);
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
index 4158747..8e28f90 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
@@ -19,10 +19,10 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats.merge;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.apache.parquet.Log;
 
 public class StringColumnStatsMerger extends ColumnStatsMerger {
   @Override
@@ -32,18 +32,25 @@ public class StringColumnStatsMerger extends ColumnStatsMerger {
     aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
     aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
     aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+    if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+        || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
       aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      long ndv = ndvEstimator.estimateNumDistinctValues();
+      NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+      NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+          .getNumDistinctValueEstimator(newData.getBitVectors());
+      long ndv = -1;
+      if (oldEst.canMerge(newEst)) {
+        oldEst.mergeEstimators(newEst);
+        ndv = oldEst.estimateNumDistinctValues();
+        aggregateData.setBitVectors(oldEst.serialize());
+      } else {
+        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+      }
       LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
           + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
       aggregateData.setNumDVs(ndv);
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
index 87b1ac8..74e1669 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
@@ -60,7 +60,7 @@ public class TestHBaseAggregateStatsNDVUniformDist {
   private HBaseStore store;
   SortedMap<String, Cell> rows = new TreeMap<>();
 
-  // NDV will be 3 for bitVectors[0] and 12 for bitVectors[1] 
+  // NDV will be 3 for bitVectors[0] and 1 for bitVectors[1]
   String bitVectors[] = {
       "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}",
       "{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}" };
@@ -278,7 +278,7 @@ public class TestHBaseAggregateStatsNDVUniformDist {
         Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
         Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
         Assert.assertEquals(45, lcsd.getNumNulls());
-        Assert.assertEquals(12, lcsd.getNumDVs());
+        Assert.assertEquals(3, lcsd.getNumDVs());
       }
     };
     List<String> partNames = new ArrayList<>();
@@ -422,7 +422,7 @@ public class TestHBaseAggregateStatsNDVUniformDist {
         Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
         Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
         Assert.assertEquals(40, lcsd.getNumNulls());
-        Assert.assertEquals(12, lcsd.getNumDVs());
+        Assert.assertEquals(3, lcsd.getNumDVs());
       }
     };
     List<String> partNames = new ArrayList<>();
@@ -494,7 +494,7 @@ public class TestHBaseAggregateStatsNDVUniformDist {
         Assert.assertEquals(1010, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01);
         Assert.assertEquals(-1010, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01);
         Assert.assertEquals(40, lcsd.getNumNulls());
-        Assert.assertEquals(12, lcsd.getNumDVs());
+        Assert.assertEquals(3, lcsd.getNumDVs());
       }
     };
     List<String> partNames = new ArrayList<>();
@@ -566,7 +566,7 @@ public class TestHBaseAggregateStatsNDVUniformDist {
         Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
         Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
         Assert.assertEquals(40, lcsd.getNumNulls());
-        Assert.assertEquals(12, lcsd.getNumDVs());
+        Assert.assertEquals(3, lcsd.getNumDVs());
       }
     };
     List<String> partNames = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 5732965..e17fe50 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -364,6 +364,11 @@
       <version>${datanucleus-core.version}</version>
     </dependency>
     <dependency>
+      <groupId>javolution</groupId>
+      <artifactId>javolution</artifactId>
+      <version>${javolution.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.calcite</groupId>
       <artifactId>calcite-core</artifactId>
       <version>${calcite.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
index 0a5cf00..1923a9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
@@ -37,8 +38,6 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.session.OperationLog;
-import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -246,7 +245,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
     return colName.replaceAll("`", "``");
   }
 
-  private String genRewrittenQuery(List<String> colNames, int numBitVectors, Map<String,String> partSpec,
+  private String genRewrittenQuery(List<String> colNames, HiveConf conf, Map<String,String> partSpec,
     boolean isPartitionStats) throws SemanticException{
     StringBuilder rewrittenQueryBuilder = new StringBuilder("select ");
     String rewrittenQuery;
@@ -255,11 +254,20 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
       if (i > 0) {
         rewrittenQueryBuilder.append(" , ");
       }
+      String func = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_NDV_ALGO).toLowerCase();
       rewrittenQueryBuilder.append("compute_stats(`");
       rewrittenQueryBuilder.append(escapeBackTicks(colNames.get(i)));
-      rewrittenQueryBuilder.append("` , ");
-      rewrittenQueryBuilder.append(numBitVectors);
-      rewrittenQueryBuilder.append(" )");
+      rewrittenQueryBuilder.append("`, '" + func + "'");
+      if (func.equals("fm")) {
+        int numBitVectors = 0;
+        try {
+          numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+        } catch (Exception e) {
+          throw new SemanticException(e.getMessage());
+        }
+        rewrittenQueryBuilder.append(", " + numBitVectors);
+      }
+      rewrittenQueryBuilder.append(")");
     }
 
     if (isPartitionStats) {
@@ -377,13 +385,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
         isTableLevel = true;
       }
       colType = getColumnTypes(colNames);
-      int numBitVectors;
-      try {
-        numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
-      } catch (Exception e) {
-        throw new SemanticException(e.getMessage());
-      }
-      rewrittenQuery = genRewrittenQuery(colNames, numBitVectors, partSpec, isPartitionStats);
+      rewrittenQuery = genRewrittenQuery(colNames, conf, partSpec, isPartitionStats);
       rewrittenTree = genRewrittenTree(rewrittenQuery);
     } else {
       // Not an analyze table column compute statistics statement - don't do any rewrites
@@ -447,13 +449,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
       isTableLevel = true;
     }
     colType = getColumnTypes(colNames);
-    int numBitVectors = 0;
-    try {
-      numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
-    } catch (Exception e) {
-      throw new SemanticException(e.getMessage());
-    }
-    rewrittenQuery = genRewrittenQuery(colNames, numBitVectors, partSpec, isPartitionStats);
+    rewrittenQuery = genRewrittenQuery(colNames, conf, partSpec, isPartitionStats);
     rewrittenTree = genRewrittenTree(rewrittenQuery);
 
     context.analyzeRewrite = new AnalyzeRewriteContext();

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
index 76f7dae..3b9ab41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
@@ -1640,60 +1640,6 @@ public class StatsUtils {
     }
   }
 
-  public static int getNumBitVectorsForNDVEstimation(HiveConf conf) throws SemanticException {
-    int numBitVectors;
-    float percentageError = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_STATS_NDV_ERROR);
-
-    if (percentageError < 0.0) {
-      throw new SemanticException("hive.stats.ndv.error can't be negative");
-    } else if (percentageError <= 2.4) {
-      numBitVectors = 1024;
-      LOG.info("Lowest error achievable is 2.4% but error requested is " + percentageError + "%");
-      LOG.info("Choosing 1024 bit vectors..");
-    } else if (percentageError <= 3.4 ) {
-      numBitVectors = 1024;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 1024 bit vectors..");
-    } else if (percentageError <= 4.8) {
-      numBitVectors = 512;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 512 bit vectors..");
-     } else if (percentageError <= 6.8) {
-      numBitVectors = 256;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 256 bit vectors..");
-    } else if (percentageError <= 9.7) {
-      numBitVectors = 128;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 128 bit vectors..");
-    } else if (percentageError <= 13.8) {
-      numBitVectors = 64;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 64 bit vectors..");
-    } else if (percentageError <= 19.6) {
-      numBitVectors = 32;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 32 bit vectors..");
-    } else if (percentageError <= 28.2) {
-      numBitVectors = 16;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 16 bit vectors..");
-    } else if (percentageError <= 40.9) {
-      numBitVectors = 8;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 8 bit vectors..");
-    } else if (percentageError <= 61.0) {
-      numBitVectors = 4;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 4 bit vectors..");
-    } else {
-      numBitVectors = 2;
-      LOG.info("Error requested is " + percentageError + "%");
-      LOG.info("Choosing 2 bit vectors..");
-    }
-    return numBitVectors;
-  }
-
   public static boolean hasDiscreteRange(ColStatistics colStat) {
     if (colStat.getRange() != null) {
       TypeInfo colType = TypeInfoUtils.getTypeInfoFromTypeString(colStat.getColumnType());

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DecimalNumDistinctValueEstimator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DecimalNumDistinctValueEstimator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DecimalNumDistinctValueEstimator.java
deleted file mode 100644
index a05906e..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DecimalNumDistinctValueEstimator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.udf.generic;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-
-public class DecimalNumDistinctValueEstimator extends NumDistinctValueEstimator {
-
-  public DecimalNumDistinctValueEstimator(int numBitVectors) {
-    super(numBitVectors);
-  }
-
-  public DecimalNumDistinctValueEstimator(String s, int numBitVectors) {
-    super(s, numBitVectors);
-  }
-
-  public void addToEstimator(HiveDecimal decimal) {
-    int v = decimal.hashCode();
-    super.addToEstimator(v);
-  }
-
-  public void addToEstimatorPCSA(HiveDecimal decimal) {
-    int v = decimal.hashCode();
-    super.addToEstimatorPCSA(v);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b883d313/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DoubleNumDistinctValueEstimator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DoubleNumDistinctValueEstimator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DoubleNumDistinctValueEstimator.java
deleted file mode 100644
index e76fc74..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/DoubleNumDistinctValueEstimator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.udf.generic;
-
-public class DoubleNumDistinctValueEstimator extends NumDistinctValueEstimator {
-
-  public DoubleNumDistinctValueEstimator(int numBitVectors) {
-    super(numBitVectors);
-  }
-
-  public DoubleNumDistinctValueEstimator(String s, int numVectors) {
-    super(s, numVectors);
-  }
-
-   public void addToEstimator(double d) {
-     int v = new Double(d).hashCode();
-     super.addToEstimator(v);
-  }
-
-  public void addToEstimatorPCSA(double d) {
-    int v = new Double(d).hashCode();
-    super.addToEstimatorPCSA(v);
-  }
-}