You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2017/07/25 22:42:13 UTC

[10/11] hive git commit: HIVE-16997: Extend object store to store and use bit vectors (Pengcheng Xiong, reviewed by Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
new file mode 100644
index 0000000..6fae3e5
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DateColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      if (!cso.getStatsData().getDateStats().isSetBitVectors()
+          || cso.getStatsData().getDateStats().getBitVectors().length() == 0) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(cso.getStatsData().getDateStats().getBitVectors());
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      DateColumnStatsData aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        DateColumnStatsData newData = cso.getStatsData().getDateStats();
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue()))
+            / newData.getNumDVs();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue()));
+          aggregateData
+              .setHighValue(max(aggregateData.getHighValue(), newData.getHighValue()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        long estimation;
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          estimation = (long) (diff(aggregateData.getHighValue(), aggregateData.getLowValue()) / densityAvg);
+          if (estimation < lowerBound) {
+            estimation = lowerBound;
+          } else if (estimation > higherBound) {
+            estimation = higherBound;
+          }
+        } else {
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner);
+        }
+        aggregateData.setNumDVs(estimation);
+      }
+      columnStatisticsData.setDateStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DateColumnStatsData newData = cso.getStatsData().getDateStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        DateColumnStatsData aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DateColumnStatsData newData = cso.getStatsData().getDateStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+              aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setDateStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue())
+                    / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue()));
+            aggregateData.setHighValue(max(aggregateData.getHighValue(), newData.getHighValue()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setDateStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue())
+                / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    statsObj.setStatsData(columnStatisticsData);
+    LOG.debug("Ndv estimatation for " + colName + " is "
+        + columnStatisticsData.getDateStats().getNumDVs());
+    return statsObj;
+  }
+  
+  private long diff(Date d1, Date d2) {
+    return d1.getDaysSinceEpoch() - d2.getDaysSinceEpoch();
+  }
+
+  private Date min(Date d1, Date d2) {
+    return d1.compareTo(d2) < 0 ? d1 : d2;
+  }
+
+  private Date max(Date d1, Date d2) {
+    return d1.compareTo(d2) < 0 ? d2 : d1;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    DateColumnStatsData extrapolateDateData = new DateColumnStatsData();
+    Map<String, DateColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDateStats());
+    }
+    List<Map.Entry<String, DateColumnStatsData>> list = new LinkedList<Map.Entry<String, DateColumnStatsData>>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() {
+      public int compare(Map.Entry<String, DateColumnStatsData> o1,
+          Map.Entry<String, DateColumnStatsData> o2) {
+        return diff(o1.getValue().getLowValue(), o2.getValue().getLowValue()) < 0 ? -1 : 1;
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long lowValue = 0;
+    long min = list.get(0).getValue().getLowValue().getDaysSinceEpoch();
+    long max = list.get(list.size() - 1).getValue().getLowValue().getDaysSinceEpoch();
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() {
+      public int compare(Map.Entry<String, DateColumnStatsData> o1,
+          Map.Entry<String, DateColumnStatsData> o2) {
+        return diff(o1.getValue().getHighValue(), o2.getValue().getHighValue()) < 0 ? -1 : 1;
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long highValue = 0;
+    min = list.get(0).getValue().getHighValue().getDaysSinceEpoch();
+    max = list.get(list.size() - 1).getValue().getHighValue().getDaysSinceEpoch();
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, DateColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, DateColumnStatsData>>() {
+      public int compare(Map.Entry<String, DateColumnStatsData> o1,
+          Map.Entry<String, DateColumnStatsData> o2) {
+        return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, DateColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
+    } else {
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      min = list.get(0).getValue().getNumDVs();
+      max = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = min;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+      }
+    }
+    extrapolateDateData.setLowValue(new Date(lowValue));
+    extrapolateDateData.setHighValue(new Date(highValue));
+    extrapolateDateData.setNumNulls(numNulls);
+    extrapolateDateData.setNumDVs(ndv);
+    extrapolateData.setDateStats(extrapolateDateData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
new file mode 100644
index 0000000..2ea2fcc
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StringColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+
+  @Override
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors. Only when both of the conditions are true, we merge bit
+    // vectors. Otherwise, just use the maximum function.
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats);
+    NumDistinctValueEstimator ndvEstimator = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      if (!cso.getStatsData().getStringStats().isSetBitVectors()
+          || cso.getStatsData().getStringStats().getBitVectors().length() == 0) {
+        ndvEstimator = null;
+        break;
+      } else {
+        // check if all of the bit vectors can merge
+        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
+            .getNumDistinctValueEstimator(cso.getStatsData().getStringStats().getBitVectors());
+        if (ndvEstimator == null) {
+          ndvEstimator = estimator;
+        } else {
+          if (ndvEstimator.canMerge(estimator)) {
+            continue;
+          } else {
+            ndvEstimator = null;
+            break;
+          }
+        }
+      }
+    }
+    if (ndvEstimator != null) {
+      ndvEstimator = NumDistinctValueEstimatorFactory
+          .getEmptyNumDistinctValueEstimator(ndvEstimator);
+    }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null));
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      StringColumnStatsData aggregateData = null;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        StringColumnStatsData newData = cso.getStatsData().getStringStats();
+        if (ndvEstimator != null) {
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData
+              .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+          aggregateData
+              .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+        }
+      }
+      if (ndvEstimator != null) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        // aggregateData already has the ndv of the max of all
+      }
+      columnStatisticsData.setStringStats(aggregateData);
+    } else {
+      // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+      if (ndvEstimator == null) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          StringColumnStatsData newData = cso.getStatsData().getStringStats();
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        StringColumnStatsData aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          StringColumnStatsData newData = cso.getStatsData().getStringStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+              aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setStringStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+              ndvEstimator = NumDistinctValueEstimatorFactory
+                  .getEmptyNumDistinctValueEstimator(ndvEstimator);
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setAvgColLen(Math.min(aggregateData.getAvgColLen(),
+                newData.getAvgColLen()));
+            aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(),
+                newData.getMaxColLen()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
+              .getNumDistinctValueEstimator(newData.getBitVectors()));
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setStringStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+          adjustedStatsMap, -1);
+    }
+    LOG.debug("Ndv estimatation for " + colName + " is "
+        + columnStatisticsData.getStringStats().getNumDVs());
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    StringColumnStatsData extrapolateStringData = new StringColumnStatsData();
+    Map<String, StringColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getStringStats());
+    }
+    List<Map.Entry<String, StringColumnStatsData>> list = new LinkedList<Map.Entry<String, StringColumnStatsData>>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the avgLen
+    Collections.sort(list, new Comparator<Map.Entry<String, StringColumnStatsData>>() {
+      public int compare(Map.Entry<String, StringColumnStatsData> o1,
+          Map.Entry<String, StringColumnStatsData> o2) {
+        return o1.getValue().getAvgColLen() < o2.getValue().getAvgColLen() ? -1 : 1;
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double avgColLen = 0;
+    double min = list.get(0).getValue().getAvgColLen();
+    double max = list.get(list.size() - 1).getValue().getAvgColLen();
+    if (minInd == maxInd) {
+      avgColLen = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      avgColLen = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      avgColLen = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the maxLen
+    Collections.sort(list, new Comparator<Map.Entry<String, StringColumnStatsData>>() {
+      public int compare(Map.Entry<String, StringColumnStatsData> o1,
+          Map.Entry<String, StringColumnStatsData> o2) {
+        return o1.getValue().getMaxColLen() < o2.getValue().getMaxColLen() ? -1 : 1;
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double maxColLen = 0;
+    min = list.get(0).getValue().getAvgColLen();
+    max = list.get(list.size() - 1).getValue().getAvgColLen();
+    if (minInd == maxInd) {
+      maxColLen = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      maxColLen = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      maxColLen = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, StringColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, StringColumnStatsData>>() {
+      public int compare(Map.Entry<String, StringColumnStatsData> o1,
+          Map.Entry<String, StringColumnStatsData> o2) {
+        return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    min = list.get(0).getValue().getNumDVs();
+    max = list.get(list.size() - 1).getValue().getNumDVs();
+    if (minInd == maxInd) {
+      ndv = (long) min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+    }
+    extrapolateStringData.setAvgColLen(avgColLen);
+    ;
+    extrapolateStringData.setMaxColLen((long) maxColLen);
+    extrapolateStringData.setNumNulls(numNulls);
+    extrapolateStringData.setNumDVs(ndv);
+    extrapolateData.setStringStats(extrapolateStringData);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
index 0e11989..78a962a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregator;
-import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregatorFactory;
+import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
+import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory;
 
 import java.io.IOException;
 import java.security.MessageDigest;
@@ -84,7 +84,10 @@ class StatsCache {
         .build(new CacheLoader<StatsCacheKey, AggrStats>() {
           @Override
           public AggrStats load(StatsCacheKey key) throws Exception {
-            boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+            boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf,
+                HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+            double ndvTuner = HiveConf.getFloatVar(conf,
+                HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
             HBaseReadWrite hrw = HBaseReadWrite.getInstance();
             AggrStats aggrStats = hrw.getAggregatedStats(key.hashed);
             if (aggrStats == null) {
@@ -100,7 +103,7 @@ class StatsCache {
                 if (aggregator == null) {
                   aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css.iterator()
                       .next().getStatsObj().iterator().next().getStatsData().getSetField(),
-                      useDensityFunctionForNDVEstimation);
+                      useDensityFunctionForNDVEstimation, ndvTuner);
                 }
                 ColumnStatisticsObj statsObj = aggregator
                     .aggregate(key.colName, key.partNames, css);

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
index d81d612..e6c836b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
index e796df2..a34bc9f 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
index 29a0539..a52e5e5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.List;
 
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 
 public abstract class ColumnStatsAggregator {
   public boolean useDensityFunctionForNDVEstimation;
-
+  public double ndvTuner;
   public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames,
       List<ColumnStatistics> css) throws MetaException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
index 568bf06..173e06f 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
@@ -17,13 +17,14 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -34,7 +35,8 @@ public class ColumnStatsAggregatorFactory {
   private ColumnStatsAggregatorFactory() {
   }
 
-  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, boolean useDensityFunctionForNDVEstimation) {
+  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type,
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner) {
     ColumnStatsAggregator agg;
     switch (type) {
     case BOOLEAN_STATS:
@@ -43,6 +45,9 @@ public class ColumnStatsAggregatorFactory {
     case LONG_STATS:
       agg = new LongColumnStatsAggregator();
       break;
+    case DATE_STATS:
+      agg = new DateColumnStatsAggregator();
+      break;
     case DOUBLE_STATS:
       agg = new DoubleColumnStatsAggregator();
       break;
@@ -59,6 +64,7 @@ public class ColumnStatsAggregatorFactory {
       throw new RuntimeException("Woh, bad.  Unknown stats type " + type.toString());
     }
     agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation;
+    agg.ndvTuner = ndvTuner;
     return agg;
   }
 
@@ -76,6 +82,10 @@ public class ColumnStatsAggregatorFactory {
       csd.setLongStats(new LongColumnStatsData());
       break;
 
+    case DATE_STATS:
+      csd.setDateStats(new DateColumnStatsData());
+      break;
+
     case DOUBLE_STATS:
       csd.setDoubleStats(new DoubleColumnStatsData());
       break;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
index 8eb64e0..5924c3e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -35,9 +35,13 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements
     IExtrapolatePartStatus {
+  
+  private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsAggregator.class);
 
   @Override
   public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
@@ -47,6 +51,7 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
     // check if all the ColumnStatisticsObjs contain stats and all the ndv are
     // bitvectors
     boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats);
     NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
@@ -85,6 +90,7 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
       ndvEstimator = NumDistinctValueEstimatorFactory
           .getEmptyNumDistinctValueEstimator(ndvEstimator);
     }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null));
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
     if (doAllPartitionContainStats || css.size() < 2) {
       DecimalColumnStatsData aggregateData = null;
@@ -94,12 +100,10 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
-        if (useDensityFunctionForNDVEstimation) {
-          lowerBound = Math.max(lowerBound, newData.getNumDVs());
-          higherBound += newData.getNumDVs();
-          densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
-              .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
-        }
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
+            .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
         if (ndvEstimator != null) {
           ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
               .getNumDistinctValueEstimator(newData.getBitVectors()));
@@ -129,28 +133,27 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
         // to get a good estimation.
         aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
       } else {
+        long estimation;
         if (useDensityFunctionForNDVEstimation) {
           // We have estimation, lowerbound and higherbound. We use estimation
           // if it is between lowerbound and higherbound.
           double densityAvg = densityAvgSum / partNames.size();
-          long estimation = (long) ((HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+          estimation = (long) ((HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
               .getDoubleValue(aggregateData.getLowValue())) / densityAvg);
           if (estimation < lowerBound) {
-            aggregateData.setNumDVs(lowerBound);
+            estimation = lowerBound;
           } else if (estimation > higherBound) {
-            aggregateData.setNumDVs(higherBound);
-          } else {
-            aggregateData.setNumDVs(estimation);
+            estimation = higherBound;
           }
         } else {
-          // Without useDensityFunctionForNDVEstimation, we just use the
-          // default one, which is the max of all the partitions and it is
-          // already done.
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner);
         }
+        aggregateData.setNumDVs(estimation);
       }
       columnStatisticsData.setDecimalStats(aggregateData);
     } else {
       // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
       Map<String, Integer> indexMap = new HashMap<String, Integer>();
       for (int index = 0; index < partNames.size(); index++) {
         indexMap.put(partNames.get(index), index);
@@ -251,6 +254,8 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
           adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
     }
     statsObj.setStatsData(columnStatisticsData);
+    LOG.debug("Ndv estimatation for " + colName + " is "
+        + columnStatisticsData.getDecimalStats().getNumDVs());
     return statsObj;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
index b6b8612..e55c412 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -33,10 +33,14 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implements
     IExtrapolatePartStatus {
 
+  private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+
   @Override
   public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
       List<ColumnStatistics> css) throws MetaException {
@@ -45,6 +49,7 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
     // check if all the ColumnStatisticsObjs contain stats and all the ndv are
     // bitvectors
     boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats);
     NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
@@ -83,6 +88,7 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
       ndvEstimator = NumDistinctValueEstimatorFactory
           .getEmptyNumDistinctValueEstimator(ndvEstimator);
     }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null));
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
     if (doAllPartitionContainStats || css.size() < 2) {
       DoubleColumnStatsData aggregateData = null;
@@ -92,11 +98,9 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
-        if (useDensityFunctionForNDVEstimation) {
-          lowerBound = Math.max(lowerBound, newData.getNumDVs());
-          higherBound += newData.getNumDVs();
-          densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
-        }
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
         if (ndvEstimator != null) {
           ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
               .getNumDistinctValueEstimator(newData.getBitVectors()));
@@ -117,27 +121,26 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
         // to get a good estimation.
         aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
       } else {
+        long estimation;
         if (useDensityFunctionForNDVEstimation) {
           // We have estimation, lowerbound and higherbound. We use estimation
           // if it is between lowerbound and higherbound.
           double densityAvg = densityAvgSum / partNames.size();
-          long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
+          estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
           if (estimation < lowerBound) {
-            aggregateData.setNumDVs(lowerBound);
+            estimation = lowerBound;
           } else if (estimation > higherBound) {
-            aggregateData.setNumDVs(higherBound);
-          } else {
-            aggregateData.setNumDVs(estimation);
+            estimation = higherBound;
           }
         } else {
-          // Without useDensityFunctionForNDVEstimation, we just use the
-          // default one, which is the max of all the partitions and it is
-          // already done.
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner);
         }
+        aggregateData.setNumDVs(estimation);
       }
       columnStatisticsData.setDoubleStats(aggregateData);
     } else {
       // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
       Map<String, Integer> indexMap = new HashMap<String, Integer>();
       for (int index = 0; index < partNames.size(); index++) {
         indexMap.put(partNames.get(index), index);
@@ -225,6 +228,8 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
       extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
           adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
     }
+    LOG.debug("Ndv estimatation for " + colName + " is "
+        + columnStatisticsData.getDoubleStats().getNumDVs());
     statsObj.setStatsData(columnStatisticsData);
     return statsObj;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
index af75bce..acf679e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
index 2da6f60..2ee09f3 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats;
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -33,10 +33,14 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
     IExtrapolatePartStatus {
 
+  private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+
   @Override
   public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
       List<ColumnStatistics> css) throws MetaException {
@@ -45,6 +49,7 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
     // check if all the ColumnStatisticsObjs contain stats and all the ndv are
     // bitvectors
     boolean doAllPartitionContainStats = partNames.size() == css.size();
+    LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats);
     NumDistinctValueEstimator ndvEstimator = null;
     String colType = null;
     for (ColumnStatistics cs : css) {
@@ -83,6 +88,7 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
       ndvEstimator = NumDistinctValueEstimatorFactory
           .getEmptyNumDistinctValueEstimator(ndvEstimator);
     }
+    LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null));
     ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
     if (doAllPartitionContainStats || css.size() < 2) {
       LongColumnStatsData aggregateData = null;
@@ -92,11 +98,9 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
       for (ColumnStatistics cs : css) {
         ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
         LongColumnStatsData newData = cso.getStatsData().getLongStats();
-        if (useDensityFunctionForNDVEstimation) {
-          lowerBound = Math.max(lowerBound, newData.getNumDVs());
-          higherBound += newData.getNumDVs();
-          densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
-        }
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
+        higherBound += newData.getNumDVs();
+        densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
         if (ndvEstimator != null) {
           ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
               .getNumDistinctValueEstimator(newData.getBitVectors()));
@@ -117,27 +121,27 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
         // to get a good estimation.
         aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
       } else {
+        long estimation;
         if (useDensityFunctionForNDVEstimation) {
           // We have estimation, lowerbound and higherbound. We use estimation
           // if it is between lowerbound and higherbound.
           double densityAvg = densityAvgSum / partNames.size();
-          long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
+          estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
           if (estimation < lowerBound) {
-            aggregateData.setNumDVs(lowerBound);
+            estimation = lowerBound;
           } else if (estimation > higherBound) {
-            aggregateData.setNumDVs(higherBound);
-          } else {
-            aggregateData.setNumDVs(estimation);
+            estimation = higherBound;
           }
         } else {
-          // Without useDensityFunctionForNDVEstimation, we just use the
-          // default one, which is the max of all the partitions and it is
-          // already done.
+          estimation = (long) (lowerBound + (higherBound - lowerBound) * ndvTuner);
         }
+        aggregateData.setNumDVs(estimation);
       }
       columnStatisticsData.setLongStats(aggregateData);
     } else {
       // we need extrapolation
+      LOG.debug("start extrapolation for " + colName);
+
       Map<String, Integer> indexMap = new HashMap<String, Integer>();
       for (int index = 0; index < partNames.size(); index++) {
         indexMap.put(partNames.get(index), index);
@@ -226,6 +230,8 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
           adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
     }
     statsObj.setStatsData(columnStatisticsData);
+    LOG.debug("Ndv estimatation for " + colName + " is "
+        + columnStatisticsData.getLongStats().getNumDVs());
     return statsObj;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
deleted file mode 100644
index 83c6c54..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.hbase.stats;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-
-public class StringColumnStatsAggregator extends ColumnStatsAggregator {
-
-  @Override
-  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
-      List<ColumnStatistics> css) throws MetaException {
-    ColumnStatisticsObj statsObj = null;
-
-    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
-    // bitvectors. Only when both of the conditions are true, we merge bit
-    // vectors. Otherwise, just use the maximum function.
-    boolean doAllPartitionContainStats = partNames.size() == css.size();
-    NumDistinctValueEstimator ndvEstimator = null;
-    String colType = null;
-    for (ColumnStatistics cs : css) {
-      if (cs.getStatsObjSize() != 1) {
-        throw new MetaException(
-            "The number of columns should be exactly one in aggrStats, but found "
-                + cs.getStatsObjSize());
-      }
-      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-      if (statsObj == null) {
-        colType = cso.getColType();
-        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
-            .getStatsData().getSetField());
-      }
-      if (!cso.getStatsData().getStringStats().isSetBitVectors()
-          || cso.getStatsData().getStringStats().getBitVectors().length() == 0) {
-        ndvEstimator = null;
-        break;
-      } else {
-        // check if all of the bit vectors can merge
-        NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory
-            .getNumDistinctValueEstimator(cso.getStatsData().getStringStats().getBitVectors());
-        if (ndvEstimator == null) {
-          ndvEstimator = estimator;
-        } else {
-          if (ndvEstimator.canMerge(estimator)) {
-            continue;
-          } else {
-            ndvEstimator = null;
-            break;
-          }
-        }
-      }
-    }
-    if (ndvEstimator != null) {
-      ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator);
-    }
-    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
-    if (doAllPartitionContainStats && ndvEstimator!=null) {
-      StringColumnStatsData aggregateData = null;
-      for (ColumnStatistics cs : css) {
-        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-        StringColumnStatsData newData = cso.getStatsData().getStringStats();
-        ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory
-            .getNumDistinctValueEstimator(newData.getBitVectors()));
-        if (aggregateData == null) {
-          aggregateData = newData.deepCopy();
-        } else {
-          aggregateData
-              .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
-          aggregateData
-              .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-        }
-      }
-      aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      columnStatisticsData.setStringStats(aggregateData);
-    } else {
-      StringColumnStatsData aggregateData = null;
-      for (ColumnStatistics cs : css) {
-        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
-        StringColumnStatsData newData = cso.getStatsData().getStringStats();
-        if (aggregateData == null) {
-          aggregateData = newData.deepCopy();
-        } else {
-          aggregateData
-              .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
-          aggregateData
-              .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-        }
-      }
-      columnStatisticsData.setStringStats(aggregateData);
-    }
-    statsObj.setStatsData(columnStatisticsData);
-    return statsObj;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BinaryColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BinaryColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BinaryColumnStatsMerger.java
index af0669e..4c2d1bc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BinaryColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BinaryColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BooleanColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BooleanColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BooleanColumnStatsMerger.java
index 33ff6a1..8e50153 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BooleanColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/BooleanColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
index d3051a2..474d4dd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
index c013ba5..0ce1847 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
index e899bfe..2542a00 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
index 4099ffc..4e8e129 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DecimalColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
index 1691fc9..4ef5c39 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DoubleColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
index 361af35..acf7f03 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/LongColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
index 8e28f90..b3cd33c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/StringColumnStatsMerger.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/model/org/apache/hadoop/hive/metastore/model/MPartitionColumnStatistics.java
----------------------------------------------------------------------
diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MPartitionColumnStatistics.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MPartitionColumnStatistics.java
index 2967a60..20129bb 100644
--- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MPartitionColumnStatistics.java
+++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MPartitionColumnStatistics.java
@@ -48,6 +48,7 @@ public class MPartitionColumnStatistics {
   private String decimalHighValue;
   private Long numNulls;
   private Long numDVs;
+  private byte[] bitVector;
   private Double avgColLen;
   private Long maxColLen;
   private Long numTrues;
@@ -166,31 +167,35 @@ public class MPartitionColumnStatistics {
     this.numNulls = numNulls;
   }
 
-  public void setLongStats(Long numNulls, Long numNDVs, Long lowValue, Long highValue) {
+  public void setLongStats(Long numNulls, Long numNDVs, byte[] bitVector, Long lowValue, Long highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.longLowValue = lowValue;
     this.longHighValue = highValue;
   }
 
-  public void setDoubleStats(Long numNulls, Long numNDVs, Double lowValue, Double highValue) {
+  public void setDoubleStats(Long numNulls, Long numNDVs, byte[] bitVector, Double lowValue, Double highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.doubleLowValue = lowValue;
     this.doubleHighValue = highValue;
   }
 
   public void setDecimalStats(
-      Long numNulls, Long numNDVs, String lowValue, String highValue) {
+      Long numNulls, Long numNDVs, byte[] bitVector, String lowValue, String highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.decimalLowValue = lowValue;
     this.decimalHighValue = highValue;
   }
 
-  public void setStringStats(Long numNulls, Long numNDVs, Long maxColLen, Double avgColLen) {
+  public void setStringStats(Long numNulls, Long numNDVs, byte[] bitVector, Long maxColLen, Double avgColLen) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.maxColLen = maxColLen;
     this.avgColLen = avgColLen;
   }
@@ -201,9 +206,10 @@ public class MPartitionColumnStatistics {
     this.avgColLen = avgColLen;
   }
 
-  public void setDateStats(Long numNulls, Long numNDVs, Long lowValue, Long highValue) {
+  public void setDateStats(Long numNulls, Long numNDVs, byte[] bitVector, Long lowValue, Long highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.longLowValue = lowValue;
     this.longHighValue = highValue;
   }
@@ -255,4 +261,12 @@ public class MPartitionColumnStatistics {
   public void setDecimalHighValue(String decimalHighValue) {
     this.decimalHighValue = decimalHighValue;
   }
+
+  public byte[] getBitVector() {
+    return bitVector;
+  }
+
+  public void setBitVector(byte[] bitVector) {
+    this.bitVector = bitVector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableColumnStatistics.java
----------------------------------------------------------------------
diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableColumnStatistics.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableColumnStatistics.java
index 132f7a1..6cfaca3 100644
--- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableColumnStatistics.java
+++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableColumnStatistics.java
@@ -46,6 +46,7 @@ public class MTableColumnStatistics {
   private String decimalHighValue;
   private Long numNulls;
   private Long numDVs;
+  private byte[] bitVector;
   private Double avgColLen;
   private Long maxColLen;
   private Long numTrues;
@@ -156,31 +157,35 @@ public class MTableColumnStatistics {
     this.numNulls = numNulls;
   }
 
-  public void setLongStats(Long numNulls, Long numNDVs, Long lowValue, Long highValue) {
+  public void setLongStats(Long numNulls, Long numNDVs, byte[] bitVector, Long lowValue, Long highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.longLowValue = lowValue;
     this.longHighValue = highValue;
   }
 
-  public void setDoubleStats(Long numNulls, Long numNDVs, Double lowValue, Double highValue) {
+  public void setDoubleStats(Long numNulls, Long numNDVs, byte[] bitVector, Double lowValue, Double highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.doubleLowValue = lowValue;
     this.doubleHighValue = highValue;
   }
 
   public void setDecimalStats(
-      Long numNulls, Long numNDVs, String lowValue, String highValue) {
+      Long numNulls, Long numNDVs, byte[] bitVector, String lowValue, String highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.decimalLowValue = lowValue;
     this.decimalHighValue = highValue;
   }
 
-  public void setStringStats(Long numNulls, Long numNDVs, Long maxColLen, Double avgColLen) {
+  public void setStringStats(Long numNulls, Long numNDVs, byte[] bitVector, Long maxColLen, Double avgColLen) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.maxColLen = maxColLen;
     this.avgColLen = avgColLen;
   }
@@ -191,9 +196,10 @@ public class MTableColumnStatistics {
     this.avgColLen = avgColLen;
   }
 
-  public void setDateStats(Long numNulls, Long numNDVs, Long lowValue, Long highValue) {
+  public void setDateStats(Long numNulls, Long numNDVs, byte[] bitVector, Long lowValue, Long highValue) {
     this.numNulls = numNulls;
     this.numDVs = numNDVs;
+    this.bitVector = bitVector;
     this.longLowValue = lowValue;
     this.longHighValue = highValue;
   }
@@ -246,4 +252,12 @@ public class MTableColumnStatistics {
   public void setDecimalHighValue(String decimalHighValue) {
     this.decimalHighValue = decimalHighValue;
   }
+
+  public byte[] getBitVector() {
+    return bitVector;
+  }
+
+  public void setBitVector(byte[] bitVector) {
+    this.bitVector = bitVector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f8b79fe6/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index 9c4bc21..570fd44 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -879,6 +879,9 @@
       <field name="numDVs">
         <column name="NUM_DISTINCTS" jdbc-type="BIGINT" allows-null="true"/>
       </field>
+      <field name="bitVector">
+        <column name="BIT_VECTOR" jdbc-type="BLOB" allows-null="true"/>
+      </field>
       <field name="avgColLen">
         <column name="AVG_COL_LEN" jdbc-type="DOUBLE" allows-null="true"/>
       </field>
@@ -943,6 +946,9 @@
       <field name="numDVs">
         <column name="NUM_DISTINCTS" jdbc-type="BIGINT" allows-null="true"/>
       </field>
+      <field name="bitVector">
+        <column name="BIT_VECTOR" jdbc-type="BLOB" allows-null="true"/>
+      </field>
       <field name="avgColLen">
         <column name="AVG_COL_LEN" jdbc-type="DOUBLE" allows-null="true"/>
       </field>