You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/04 22:36:54 UTC

[19/50] [abbrv] hive git commit: HIVE-12960: Migrate Column Stats Extrapolation and UniformDistribution to HBaseStore (Pengcheng Xiong, reviewed by Ashutosh Chauhan)

HIVE-12960: Migrate Column Stats Extrapolation and UniformDistribution to HBaseStore (Pengcheng Xiong, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/96862093
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/96862093
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/96862093

Branch: refs/heads/llap
Commit: 968620932301dc64cd435292726943a6c0a42551
Parents: 3038b05
Author: Pengcheng Xiong <px...@apache.org>
Authored: Sun Mar 27 11:46:17 2016 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Sun Mar 27 12:11:39 2016 -0700

----------------------------------------------------------------------
 .../hive/metastore/StatObjectConverter.java     |   2 +-
 .../hadoop/hive/metastore/hbase/HBaseUtils.java |   8 +-
 .../hadoop/hive/metastore/hbase/StatsCache.java |  20 +-
 .../stats/BinaryColumnStatsAggregator.java      |  43 +-
 .../stats/BooleanColumnStatsAggregator.java     |  42 +-
 .../hbase/stats/ColumnStatsAggregator.java      |  12 +-
 .../stats/ColumnStatsAggregatorFactory.java     |   8 +-
 .../stats/DecimalColumnStatsAggregator.java     | 340 ++++++++-
 .../stats/DoubleColumnStatsAggregator.java      | 307 +++++++-
 .../hbase/stats/IExtrapolatePartStatus.java     |  30 +
 .../hbase/stats/LongColumnStatsAggregator.java  | 305 +++++++-
 .../stats/StringColumnStatsAggregator.java      |  85 ++-
 ...stHBaseAggregateStatsCacheWithBitVector.java |   6 +-
 .../TestHBaseAggregateStatsExtrapolation.java   | 717 +++++++++++++++++++
 .../TestHBaseAggregateStatsNDVUniformDist.java  | 581 +++++++++++++++
 .../clientpositive/tez/explainuser_1.q.out      |  92 +--
 16 files changed, 2454 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index b3ceff1..e119dd8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -650,7 +650,7 @@ public class StatObjectConverter {
     }
   }
 
-  private static Decimal createThriftDecimal(String s) {
+  public static Decimal createThriftDecimal(String s) {
     BigDecimal d = new BigDecimal(s);
     return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short)d.scale());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 9ec7cd5..e0b449b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.metastore.hbase;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
@@ -88,7 +90,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 /**
  * Utility functions
  */
-class HBaseUtils {
+public class HBaseUtils {
 
   final static Charset ENCODING = StandardCharsets.UTF_8;
   final static char KEY_SEPARATOR = '\u0001';
@@ -1421,4 +1423,8 @@ class HBaseUtils {
     b[7] = (byte)(v >>>  0);
     return b;
   }
+
+  public static double getDoubleValue(Decimal decimal) {
+    return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
index f1d2e50..18f8afc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
@@ -85,12 +85,12 @@ class StatsCache {
           @Override
           public AggrStats load(StatsCacheKey key) throws Exception {
             int numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+            boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
             HBaseReadWrite hrw = HBaseReadWrite.getInstance();
             AggrStats aggrStats = hrw.getAggregatedStats(key.hashed);
             if (aggrStats == null) {
               misses.incr();
               ColumnStatsAggregator aggregator = null;
-              ColumnStatisticsObj statsObj = null;
               aggrStats = new AggrStats();
               LOG.debug("Unable to find aggregated stats for " + key.colName + ", aggregating");
               List<ColumnStatistics> css = hrw.getPartitionStatistics(key.dbName, key.tableName,
@@ -98,19 +98,13 @@ class StatsCache {
                   Collections.singletonList(key.colName));
               if (css != null && css.size() > 0) {
                 aggrStats.setPartsFound(css.size());
-                for (ColumnStatistics cs : css) {
-                  for (ColumnStatisticsObj cso : cs.getStatsObj()) {
-                    if (statsObj == null) {
-                      statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(key.colName,
-                          cso.getColType(), cso.getStatsData().getSetField());
-                    }
-                    if (aggregator == null) {
-                      aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(
-                          cso.getStatsData().getSetField(), numBitVectors);
-                    }
-                    aggregator.aggregate(statsObj, cso);
-                  }
+                if (aggregator == null) {
+                  aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css.iterator()
+                      .next().getStatsObj().iterator().next().getStatsData().getSetField(),
+                      numBitVectors, useDensityFunctionForNDVEstimation);
                 }
+                ColumnStatisticsObj statsObj = aggregator
+                    .aggregate(key.colName, key.partNames, css);
                 aggrStats.addToColStats(statsObj);
                 me.put(key, aggrStats);
               }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
index 40340dd..d81d612 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
@@ -19,17 +19,46 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 
-public class BinaryColumnStatsAggregator extends ColumnStatsAggregator{
+public class BinaryColumnStatsAggregator extends ColumnStatsAggregator {
 
   @Override
-  public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
-    BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats();
-    BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats();
-    aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
-    aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+    BinaryColumnStatsData aggregateData = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats();
+      if (aggregateData == null) {
+        aggregateData = newData.deepCopy();
+      } else {
+        aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+        aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+        aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    columnStatisticsData.setBinaryStats(aggregateData);
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
index 735d965..e796df2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
@@ -19,17 +19,47 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 
 public class BooleanColumnStatsAggregator extends ColumnStatsAggregator {
 
   @Override
-  public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
-    BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats();
-    BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats();
-    aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
-    aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+    BooleanColumnStatsData aggregateData = null;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats();
+      if (aggregateData == null) {
+        aggregateData = newData.deepCopy();
+      } else {
+        aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
+        aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
+        aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    columnStatisticsData.setBooleanStats(aggregateData);
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
index 694e53b..31955b4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
@@ -19,10 +19,16 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 
 public abstract class ColumnStatsAggregator {
-  NumDistinctValueEstimator ndvEstimator = null;
-  public abstract void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats);
+  public int numBitVectors;
+  public boolean useDensityFunctionForNDVEstimation;
+
+  public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
index 8eb127b..daf8569 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
@@ -35,7 +34,7 @@ public class ColumnStatsAggregatorFactory {
   private ColumnStatsAggregatorFactory() {
   }
 
-  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors) {
+  public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors, boolean useDensityFunctionForNDVEstimation) {
     ColumnStatsAggregator agg;
     switch (type) {
     case BOOLEAN_STATS:
@@ -59,9 +58,8 @@ public class ColumnStatsAggregatorFactory {
     default:
       throw new RuntimeException("Woh, bad.  Unknown stats type " + type.toString());
     }
-    if (numBitVectors > 0) {
-      agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
-    }
+    agg.numBitVectors = numBitVectors;
+    agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation;
     return agg;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
index 50f4325..36b2c9c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
@@ -19,33 +19,333 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
 
-public class DecimalColumnStatsAggregator extends ColumnStatsAggregator {
+public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
 
   @Override
-  public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
-    DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats();
-    DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats();
-    Decimal lowValue = aggregateData.getLowValue() != null
-        && (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData
-        .getLowValue() : newData.getLowValue();
-    aggregateData.setLowValue(lowValue);
-    Decimal highValue = aggregateData.getHighValue() != null
-        && (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData
-        .getHighValue() : newData.getHighValue();
-    aggregateData.setHighValue(highValue);
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    boolean isNDVBitVectorSet = true;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      if (numBitVectors <= 0 || !cso.getStatsData().getDecimalStats().isSetBitVectors()
+          || cso.getStatsData().getDecimalStats().getBitVectors().length() == 0) {
+        isNDVBitVectorSet = false;
+        break;
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      DecimalColumnStatsData aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      NumDistinctValueEstimator ndvEstimator = null;
+      if (isNDVBitVectorSet) {
+        ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+      }
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
+        if (useDensityFunctionForNDVEstimation) {
+          lowerBound = Math.max(lowerBound, newData.getNumDVs());
+          higherBound += newData.getNumDVs();
+          densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
+              .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
+        }
+        if (isNDVBitVectorSet) {
+          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+              ndvEstimator.getnumBitVectors()));
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils
+              .getDoubleValue(newData.getLowValue())) {
+            aggregateData.setLowValue(aggregateData.getLowValue());
+          } else {
+            aggregateData.setLowValue(newData.getLowValue());
+          }
+          if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils
+              .getDoubleValue(newData.getHighValue())) {
+            aggregateData.setHighValue(aggregateData.getHighValue());
+          } else {
+            aggregateData.setHighValue(newData.getHighValue());
+          }
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+        }
+      }
+      if (isNDVBitVectorSet) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          long estimation = (long) ((HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+              .getDoubleValue(aggregateData.getLowValue())) / densityAvg);
+          if (estimation < lowerBound) {
+            aggregateData.setNumDVs(lowerBound);
+          } else if (estimation > higherBound) {
+            aggregateData.setNumDVs(higherBound);
+          } else {
+            aggregateData.setNumDVs(estimation);
+          }
+        } else {
+          // Without useDensityFunctionForNDVEstimation, we just use the
+          // default one, which is the max of all the partitions and it is
+          // already done.
+        }
+      }
+      columnStatisticsData.setDecimalStats(aggregateData);
+    } else {
+      // we need extrapolation
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (!isNDVBitVectorSet) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
+                .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        DecimalColumnStatsData aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+              aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setDecimalStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+                    .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils
+                .getDoubleValue(newData.getLowValue())) {
+              aggregateData.setLowValue(aggregateData.getLowValue());
+            } else {
+              aggregateData.setLowValue(newData.getLowValue());
+            }
+            if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils
+                .getDoubleValue(newData.getHighValue())) {
+              aggregateData.setHighValue(aggregateData.getHighValue());
+            } else {
+              aggregateData.setHighValue(newData.getHighValue());
+            }
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+              ndvEstimator.getnumBitVectors()));
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setDecimalStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+                .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    DecimalColumnStatsData extrapolateDecimalData = new DecimalColumnStatsData();
+    Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats());
+    }
+    List<Map.Entry<String, DecimalColumnStatsData>> list = new LinkedList<Map.Entry<String, DecimalColumnStatsData>>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
+      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+          Map.Entry<String, DecimalColumnStatsData> o2) {
+        return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double lowValue = 0;
+    double min = HBaseUtils.getDoubleValue(list.get(0).getValue().getLowValue());
+    double max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getLowValue());
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
+      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+          Map.Entry<String, DecimalColumnStatsData> o2) {
+        return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double highValue = 0;
+    min = HBaseUtils.getDoubleValue(list.get(0).getValue().getHighValue());
+    max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getHighValue());
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      highValue = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, DecimalColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    long ndvMin = 0;
+    long ndvMax = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
+      public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+          Map.Entry<String, DecimalColumnStatsData> o2) {
+        return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, DecimalColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      ndvMin = list.get(0).getValue().getNumDVs();
+      ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = ndvMin;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
+      }
     }
+    extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String
+        .valueOf(lowValue)));
+    extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String
+        .valueOf(highValue)));
+    extrapolateDecimalData.setNumNulls(numNulls);
+    extrapolateDecimalData.setNumDVs(ndv);
+    extrapolateData.setDecimalStats(extrapolateDecimalData);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
index d945ec2..a88ef84 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
@@ -19,26 +19,307 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 
-public class DoubleColumnStatsAggregator extends ColumnStatsAggregator {
+public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
 
   @Override
-  public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
-    DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats();
-    DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats();
-    aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
-    aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    boolean isNDVBitVectorSet = true;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      if (numBitVectors <= 0 || !cso.getStatsData().getDoubleStats().isSetBitVectors()
+          || cso.getStatsData().getDoubleStats().getBitVectors().length() == 0) {
+        isNDVBitVectorSet = false;
+        break;
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      DoubleColumnStatsData aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      NumDistinctValueEstimator ndvEstimator = null;
+      if (isNDVBitVectorSet) {
+        ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+      }
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+        if (useDensityFunctionForNDVEstimation) {
+          lowerBound = Math.max(lowerBound, newData.getNumDVs());
+          higherBound += newData.getNumDVs();
+          densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+        }
+        if (isNDVBitVectorSet) {
+          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+              ndvEstimator.getnumBitVectors()));
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+          aggregateData
+              .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+        }
+      }
+      if (isNDVBitVectorSet) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
+          if (estimation < lowerBound) {
+            aggregateData.setNumDVs(lowerBound);
+          } else if (estimation > higherBound) {
+            aggregateData.setNumDVs(higherBound);
+          } else {
+            aggregateData.setNumDVs(estimation);
+          }
+        } else {
+          // Without useDensityFunctionForNDVEstimation, we just use the
+          // default one, which is the max of all the partitions and it is
+          // already done.
+        }
+      }
+      columnStatisticsData.setDoubleStats(aggregateData);
+    } else {
+      // we need extrapolation
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (!isNDVBitVectorSet) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        DoubleColumnStatsData aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+              aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setDoubleStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+            aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
+                newData.getHighValue()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+              ndvEstimator.getnumBitVectors()));
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setDoubleStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+    }
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
+  }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    DoubleColumnStatsData extrapolateDoubleData = new DoubleColumnStatsData();
+    Map<String, DoubleColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDoubleStats());
+    }
+    List<Map.Entry<String, DoubleColumnStatsData>> list = new LinkedList<Map.Entry<String, DoubleColumnStatsData>>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
+      public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+          Map.Entry<String, DoubleColumnStatsData> o2) {
+        return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1;
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double lowValue = 0;
+    double min = list.get(0).getValue().getLowValue();
+    double max = list.get(list.size() - 1).getValue().getLowValue();
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
+      public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+          Map.Entry<String, DoubleColumnStatsData> o2) {
+        return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1;
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    double highValue = 0;
+    min = list.get(0).getValue().getHighValue();
+    max = list.get(list.size() - 1).getValue().getHighValue();
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      highValue = (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, DoubleColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    long ndvMin = 0;
+    long ndvMax = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
+      public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+          Map.Entry<String, DoubleColumnStatsData> o2) {
+        return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, DoubleColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      ndvMin = list.get(0).getValue().getNumDVs();
+      ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = ndvMin;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
+      }
     }
+    extrapolateDoubleData.setLowValue(lowValue);
+    extrapolateDoubleData.setHighValue(highValue);
+    extrapolateDoubleData.setNumNulls(numNulls);
+    extrapolateDoubleData.setNumDVs(ndv);
+    extrapolateData.setDoubleStats(extrapolateDoubleData);
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
new file mode 100644
index 0000000..99af060
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
@@ -0,0 +1,30 @@
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+
+public interface IExtrapolatePartStatus {
+  // The following function will extrapolate the stats when the column stats of
+  // some partitions are missing.
+  /**
+   * @param extrapolateData
+   *          it will carry back the specific stats, e.g., DOUBLE_STATS or
+   *          LONG_STATS
+   * @param numParts
+   *          the total number of partitions
+   * @param numPartsWithStats
+   *          the number of partitions that have stats
+   * @param adjustedIndexMap
+   *          the partition name to index map
+   * @param adjustedStatsMap
+   *          the partition name to its stats map
+   * @param densityAvg
+   *          the average of ndv density, which is useful when
+   *          useDensityFunctionForNDVEstimation is true.
+   */
+  public abstract void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
index 068dd00..8ac6561 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
@@ -19,26 +19,305 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 
-public class LongColumnStatsAggregator extends ColumnStatsAggregator {
+public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
+    IExtrapolatePartStatus {
 
   @Override
-  public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
-    LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats();
-    LongColumnStatsData newData = newColStats.getStatsData().getLongStats();
-    aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
-    aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    boolean isNDVBitVectorSet = true;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      if (numBitVectors <= 0 || !cso.getStatsData().getLongStats().isSetBitVectors()
+          || cso.getStatsData().getLongStats().getBitVectors().length() == 0) {
+        isNDVBitVectorSet = false;
+        break;
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats || css.size() < 2) {
+      LongColumnStatsData aggregateData = null;
+      long lowerBound = 0;
+      long higherBound = 0;
+      double densityAvgSum = 0.0;
+      NumDistinctValueEstimator ndvEstimator = null;
+      if (isNDVBitVectorSet) {
+        ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+      }
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        LongColumnStatsData newData = cso.getStatsData().getLongStats();
+        if (useDensityFunctionForNDVEstimation) {
+          lowerBound = Math.max(lowerBound, newData.getNumDVs());
+          higherBound += newData.getNumDVs();
+          densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+        }
+        if (isNDVBitVectorSet) {
+          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+              ndvEstimator.getnumBitVectors()));
+        }
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+          aggregateData
+              .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+        }
+      }
+      if (isNDVBitVectorSet) {
+        // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+        // use uniform distribution assumption because we can merge bitvectors
+        // to get a good estimation.
+        aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+      } else {
+        if (useDensityFunctionForNDVEstimation) {
+          // We have estimation, lowerbound and higherbound. We use estimation
+          // if it is between lowerbound and higherbound.
+          double densityAvg = densityAvgSum / partNames.size();
+          long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
+          if (estimation < lowerBound) {
+            aggregateData.setNumDVs(lowerBound);
+          } else if (estimation > higherBound) {
+            aggregateData.setNumDVs(higherBound);
+          } else {
+            aggregateData.setNumDVs(estimation);
+          }
+        } else {
+          // Without useDensityFunctionForNDVEstimation, we just use the
+          // default one, which is the max of all the partitions and it is
+          // already done.
+        }
+      }
+      columnStatisticsData.setLongStats(aggregateData);
     } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
-      aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+      // we need extrapolation
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+      Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+      Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+      // while we scan the css, we also get the densityAvg, lowerbound and
+      // higerbound when useDensityFunctionForNDVEstimation is true.
+      double densityAvgSum = 0.0;
+      if (!isNDVBitVectorSet) {
+        // if not every partition uses bitvector for ndv, we just fall back to
+        // the traditional extrapolation methods.
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          LongColumnStatsData newData = cso.getStatsData().getLongStats();
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+          }
+          adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+          adjustedStatsMap.put(partName, cso.getStatsData());
+        }
+      } else {
+        // we first merge all the adjacent bitvectors that we could merge and
+        // derive new partition names and index.
+        NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+        StringBuilder pseudoPartName = new StringBuilder();
+        double pseudoIndexSum = 0;
+        int length = 0;
+        int curIndex = -1;
+        LongColumnStatsData aggregateData = null;
+        for (ColumnStatistics cs : css) {
+          String partName = cs.getStatsDesc().getPartName();
+          ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+          LongColumnStatsData newData = cso.getStatsData().getLongStats();
+          // newData.isSetBitVectors() should be true for sure because we
+          // already checked it before.
+          if (indexMap.get(partName) != curIndex) {
+            // There is bitvector, but it is not adjacent to the previous ones.
+            if (length > 0) {
+              // we have to set ndv
+              adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+              aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+              ColumnStatisticsData csd = new ColumnStatisticsData();
+              csd.setLongStats(aggregateData);
+              adjustedStatsMap.put(pseudoPartName.toString(), csd);
+              if (useDensityFunctionForNDVEstimation) {
+                densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+              }
+              // reset everything
+              pseudoPartName = new StringBuilder();
+              pseudoIndexSum = 0;
+              length = 0;
+            }
+            aggregateData = null;
+          }
+          curIndex = indexMap.get(partName);
+          pseudoPartName.append(partName);
+          pseudoIndexSum += curIndex;
+          length++;
+          curIndex++;
+          if (aggregateData == null) {
+            aggregateData = newData.deepCopy();
+          } else {
+            aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+            aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
+                newData.getHighValue()));
+            aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          }
+          ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+              ndvEstimator.getnumBitVectors()));
+        }
+        if (length > 0) {
+          // we have to set ndv
+          adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+          aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+          ColumnStatisticsData csd = new ColumnStatisticsData();
+          csd.setLongStats(aggregateData);
+          adjustedStatsMap.put(pseudoPartName.toString(), csd);
+          if (useDensityFunctionForNDVEstimation) {
+            densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+          }
+        }
+      }
+      extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+          adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
     }
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
   }
+
+  @Override
+  public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+      int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+      Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+    int rightBorderInd = numParts;
+    LongColumnStatsData extrapolateLongData = new LongColumnStatsData();
+    Map<String, LongColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+    for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+      extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getLongStats());
+    }
+    List<Map.Entry<String, LongColumnStatsData>> list = new LinkedList<Map.Entry<String, LongColumnStatsData>>(
+        extractedAdjustedStatsMap.entrySet());
+    // get the lowValue
+    Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
+      public int compare(Map.Entry<String, LongColumnStatsData> o1,
+          Map.Entry<String, LongColumnStatsData> o2) {
+        return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1;
+      }
+    });
+    double minInd = adjustedIndexMap.get(list.get(0).getKey());
+    double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long lowValue = 0;
+    long min = list.get(0).getValue().getLowValue();
+    long max = list.get(list.size() - 1).getValue().getLowValue();
+    if (minInd == maxInd) {
+      lowValue = min;
+    } else if (minInd < maxInd) {
+      // left border is the min
+      lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
+    } else {
+      // right border is the min
+      lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+    }
+
+    // get the highValue
+    Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
+      public int compare(Map.Entry<String, LongColumnStatsData> o1,
+          Map.Entry<String, LongColumnStatsData> o2) {
+        return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1;
+      }
+    });
+    minInd = adjustedIndexMap.get(list.get(0).getKey());
+    maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+    long highValue = 0;
+    min = list.get(0).getValue().getHighValue();
+    max = list.get(list.size() - 1).getValue().getHighValue();
+    if (minInd == maxInd) {
+      highValue = min;
+    } else if (minInd < maxInd) {
+      // right border is the max
+      highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+    } else {
+      // left border is the max
+      highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
+    }
+
+    // get the #nulls
+    long numNulls = 0;
+    for (Map.Entry<String, LongColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+      numNulls += entry.getValue().getNumNulls();
+    }
+    // we scale up sumNulls based on the number of partitions
+    numNulls = numNulls * numParts / numPartsWithStats;
+
+    // get the ndv
+    long ndv = 0;
+    Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
+      public int compare(Map.Entry<String, LongColumnStatsData> o1,
+          Map.Entry<String, LongColumnStatsData> o2) {
+        return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+      }
+    });
+    long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+    long higherBound = 0;
+    for (Map.Entry<String, LongColumnStatsData> entry : list) {
+      higherBound += entry.getValue().getNumDVs();
+    }
+    if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+      ndv = (long) ((highValue - lowValue) / densityAvg);
+      if (ndv < lowerBound) {
+        ndv = lowerBound;
+      } else if (ndv > higherBound) {
+        ndv = higherBound;
+      }
+    } else {
+      minInd = adjustedIndexMap.get(list.get(0).getKey());
+      maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+      min = list.get(0).getValue().getNumDVs();
+      max = list.get(list.size() - 1).getValue().getNumDVs();
+      if (minInd == maxInd) {
+        ndv = min;
+      } else if (minInd < maxInd) {
+        // right border is the max
+        ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+      } else {
+        // left border is the max
+        ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+      }
+    }
+    extrapolateLongData.setLowValue(lowValue);
+    extrapolateLongData.setHighValue(highValue);
+    extrapolateLongData.setNumNulls(numNulls);
+    extrapolateLongData.setNumDVs(ndv);
+    extrapolateData.setLongStats(extrapolateLongData);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
index aeb6c39..2aa4046 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
@@ -19,26 +19,87 @@
 
 package org.apache.hadoop.hive.metastore.hbase.stats;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 
 public class StringColumnStatsAggregator extends ColumnStatsAggregator {
 
   @Override
-  public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
-    StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats();
-    StringColumnStatsData newData = newColStats.getStatsData().getStringStats();
-    aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
-    aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
-          ndvEstimator.getnumBitVectors()));
+  public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+      List<ColumnStatistics> css) throws MetaException {
+    ColumnStatisticsObj statsObj = null;
+
+    // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+    // bitvectors. Only when both of the conditions are true, we merge bit
+    // vectors. Otherwise, just use the maximum function.
+    boolean doAllPartitionContainStats = partNames.size() == css.size();
+    boolean isNDVBitVectorSet = true;
+    String colType = null;
+    for (ColumnStatistics cs : css) {
+      if (cs.getStatsObjSize() != 1) {
+        throw new MetaException(
+            "The number of columns should be exactly one in aggrStats, but found "
+                + cs.getStatsObjSize());
+      }
+      ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+      if (statsObj == null) {
+        colType = cso.getColType();
+        statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+            .getStatsData().getSetField());
+      }
+      if (numBitVectors <= 0 || !cso.getStatsData().getStringStats().isSetBitVectors()
+          || cso.getStatsData().getStringStats().getBitVectors().length() == 0) {
+        isNDVBitVectorSet = false;
+        break;
+      }
+    }
+    ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+    if (doAllPartitionContainStats && isNDVBitVectorSet) {
+      StringColumnStatsData aggregateData = null;
+      NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        StringColumnStatsData newData = cso.getStatsData().getStringStats();
+        ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+            ndvEstimator.getnumBitVectors()));
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData
+              .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+          aggregateData
+              .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+        }
+      }
       aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
-      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+      columnStatisticsData.setStringStats(aggregateData);
+    } else {
+      StringColumnStatsData aggregateData = null;
+      for (ColumnStatistics cs : css) {
+        ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+        StringColumnStatsData newData = cso.getStatsData().getStringStats();
+        if (aggregateData == null) {
+          aggregateData = newData.deepCopy();
+        } else {
+          aggregateData
+              .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+          aggregateData
+              .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+        }
+      }
+      columnStatisticsData.setStringStats(aggregateData);
     }
+    statsObj.setStatsData(columnStatisticsData);
+    return statsObj;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
index 36c7984..e0c4094 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
@@ -156,10 +156,8 @@ public class TestHBaseAggregateStatsCacheWithBitVector {
         Assert.assertEquals(-20.12, dcsd.getLowValue(), 0.01);
         Assert.assertEquals(60, dcsd.getNumNulls());
         Assert.assertEquals(5, dcsd.getNumDVs());
-        Assert
-            .assertEquals(
-                "{0, 1, 4, 5, 7}{0, 1}{0, 1, 2, 4}{0, 1, 2, 4}{0, 1, 2}{0, 2}{0, 1, 3, 4}{0, 1, 2, 3, 4}{0, 1, 4}{0, 1, 3, 4, 6}{0, 2}{0, 1, 3, 8}{0, 2, 3}{0, 2}{0, 1, 9}{0, 1, 4}",
-                dcsd.getBitVectors());
+        // we do not store the bitvector for the aggrStats.
+        // we can store that if it is necessary in the future.
       }
     };