You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/04 22:36:54 UTC
[19/50] [abbrv] hive git commit: HIVE-12960: Migrate Column Stats
Extrapolation and UniformDistribution to HBaseStore (Pengcheng Xiong,
reviewed by Ashutosh Chauhan)
HIVE-12960: Migrate Column Stats Extrapolation and UniformDistribution to HBaseStore (Pengcheng Xiong, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/96862093
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/96862093
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/96862093
Branch: refs/heads/llap
Commit: 968620932301dc64cd435292726943a6c0a42551
Parents: 3038b05
Author: Pengcheng Xiong <px...@apache.org>
Authored: Sun Mar 27 11:46:17 2016 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Sun Mar 27 12:11:39 2016 -0700
----------------------------------------------------------------------
.../hive/metastore/StatObjectConverter.java | 2 +-
.../hadoop/hive/metastore/hbase/HBaseUtils.java | 8 +-
.../hadoop/hive/metastore/hbase/StatsCache.java | 20 +-
.../stats/BinaryColumnStatsAggregator.java | 43 +-
.../stats/BooleanColumnStatsAggregator.java | 42 +-
.../hbase/stats/ColumnStatsAggregator.java | 12 +-
.../stats/ColumnStatsAggregatorFactory.java | 8 +-
.../stats/DecimalColumnStatsAggregator.java | 340 ++++++++-
.../stats/DoubleColumnStatsAggregator.java | 307 +++++++-
.../hbase/stats/IExtrapolatePartStatus.java | 30 +
.../hbase/stats/LongColumnStatsAggregator.java | 305 +++++++-
.../stats/StringColumnStatsAggregator.java | 85 ++-
...stHBaseAggregateStatsCacheWithBitVector.java | 6 +-
.../TestHBaseAggregateStatsExtrapolation.java | 717 +++++++++++++++++++
.../TestHBaseAggregateStatsNDVUniformDist.java | 581 +++++++++++++++
.../clientpositive/tez/explainuser_1.q.out | 92 +--
16 files changed, 2454 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index b3ceff1..e119dd8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -650,7 +650,7 @@ public class StatObjectConverter {
}
}
- private static Decimal createThriftDecimal(String s) {
+ public static Decimal createThriftDecimal(String s) {
BigDecimal d = new BigDecimal(s);
return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short)d.scale());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 9ec7cd5..e0b449b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.metastore.hbase;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
@@ -88,7 +90,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
/**
* Utility functions
*/
-class HBaseUtils {
+public class HBaseUtils {
final static Charset ENCODING = StandardCharsets.UTF_8;
final static char KEY_SEPARATOR = '\u0001';
@@ -1421,4 +1423,8 @@ class HBaseUtils {
b[7] = (byte)(v >>> 0);
return b;
}
+
+ public static double getDoubleValue(Decimal decimal) {
+ return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
index f1d2e50..18f8afc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
@@ -85,12 +85,12 @@ class StatsCache {
@Override
public AggrStats load(StatsCacheKey key) throws Exception {
int numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+ boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
HBaseReadWrite hrw = HBaseReadWrite.getInstance();
AggrStats aggrStats = hrw.getAggregatedStats(key.hashed);
if (aggrStats == null) {
misses.incr();
ColumnStatsAggregator aggregator = null;
- ColumnStatisticsObj statsObj = null;
aggrStats = new AggrStats();
LOG.debug("Unable to find aggregated stats for " + key.colName + ", aggregating");
List<ColumnStatistics> css = hrw.getPartitionStatistics(key.dbName, key.tableName,
@@ -98,19 +98,13 @@ class StatsCache {
Collections.singletonList(key.colName));
if (css != null && css.size() > 0) {
aggrStats.setPartsFound(css.size());
- for (ColumnStatistics cs : css) {
- for (ColumnStatisticsObj cso : cs.getStatsObj()) {
- if (statsObj == null) {
- statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(key.colName,
- cso.getColType(), cso.getStatsData().getSetField());
- }
- if (aggregator == null) {
- aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(
- cso.getStatsData().getSetField(), numBitVectors);
- }
- aggregator.aggregate(statsObj, cso);
- }
+ if (aggregator == null) {
+ aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css.iterator()
+ .next().getStatsObj().iterator().next().getStatsData().getSetField(),
+ numBitVectors, useDensityFunctionForNDVEstimation);
}
+ ColumnStatisticsObj statsObj = aggregator
+ .aggregate(key.colName, key.partNames, css);
aggrStats.addToColStats(statsObj);
me.put(key, aggrStats);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
index 40340dd..d81d612 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
@@ -19,17 +19,46 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
+import java.util.List;
+
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
-public class BinaryColumnStatsAggregator extends ColumnStatsAggregator{
+public class BinaryColumnStatsAggregator extends ColumnStatsAggregator {
@Override
- public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
- BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats();
- BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats();
- aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
- aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
- aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException {
+ ColumnStatisticsObj statsObj = null;
+ BinaryColumnStatsData aggregateData = null;
+ String colType = null;
+ for (ColumnStatistics cs : css) {
+ if (cs.getStatsObjSize() != 1) {
+ throw new MetaException(
+ "The number of columns should be exactly one in aggrStats, but found "
+ + cs.getStatsObjSize());
+ }
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ if (statsObj == null) {
+ colType = cso.getColType();
+ statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+ .getStatsData().getSetField());
+ }
+ BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats();
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+ aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+ }
+ ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+ columnStatisticsData.setBinaryStats(aggregateData);
+ statsObj.setStatsData(columnStatisticsData);
+ return statsObj;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
index 735d965..e796df2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
@@ -19,17 +19,47 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
+import java.util.List;
+
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
public class BooleanColumnStatsAggregator extends ColumnStatsAggregator {
@Override
- public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
- BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats();
- BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats();
- aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
- aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
- aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException {
+ ColumnStatisticsObj statsObj = null;
+ BooleanColumnStatsData aggregateData = null;
+ String colType = null;
+ for (ColumnStatistics cs : css) {
+ if (cs.getStatsObjSize() != 1) {
+ throw new MetaException(
+ "The number of columns should be exactly one in aggrStats, but found "
+ + cs.getStatsObjSize());
+ }
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ if (statsObj == null) {
+ colType = cso.getColType();
+ statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+ .getStatsData().getSetField());
+ }
+ BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats();
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
+ aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+ }
+ ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+ columnStatisticsData.setBooleanStats(aggregateData);
+ statsObj.setStatsData(columnStatisticsData);
+ return statsObj;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
index 694e53b..31955b4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
@@ -19,10 +19,16 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
public abstract class ColumnStatsAggregator {
- NumDistinctValueEstimator ndvEstimator = null;
- public abstract void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats);
+ public int numBitVectors;
+ public boolean useDensityFunctionForNDVEstimation;
+
+ public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
index 8eb127b..daf8569 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
-import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
@@ -35,7 +34,7 @@ public class ColumnStatsAggregatorFactory {
private ColumnStatsAggregatorFactory() {
}
- public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors) {
+ public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors, boolean useDensityFunctionForNDVEstimation) {
ColumnStatsAggregator agg;
switch (type) {
case BOOLEAN_STATS:
@@ -59,9 +58,8 @@ public class ColumnStatsAggregatorFactory {
default:
throw new RuntimeException("Woh, bad. Unknown stats type " + type.toString());
}
- if (numBitVectors > 0) {
- agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
- }
+ agg.numBitVectors = numBitVectors;
+ agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation;
return agg;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
index 50f4325..36b2c9c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
@@ -19,33 +19,333 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Decimal;
import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
-public class DecimalColumnStatsAggregator extends ColumnStatsAggregator {
+public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements
+ IExtrapolatePartStatus {
@Override
- public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
- DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats();
- DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats();
- Decimal lowValue = aggregateData.getLowValue() != null
- && (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData
- .getLowValue() : newData.getLowValue();
- aggregateData.setLowValue(lowValue);
- Decimal highValue = aggregateData.getHighValue() != null
- && (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData
- .getHighValue() : newData.getHighValue();
- aggregateData.setHighValue(highValue);
- aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
- if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
- aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException {
+ ColumnStatisticsObj statsObj = null;
+
+ // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+ // bitvectors
+ boolean doAllPartitionContainStats = partNames.size() == css.size();
+ boolean isNDVBitVectorSet = true;
+ String colType = null;
+ for (ColumnStatistics cs : css) {
+ if (cs.getStatsObjSize() != 1) {
+ throw new MetaException(
+ "The number of columns should be exactly one in aggrStats, but found "
+ + cs.getStatsObjSize());
+ }
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ if (statsObj == null) {
+ colType = cso.getColType();
+ statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+ .getStatsData().getSetField());
+ }
+ if (numBitVectors <= 0 || !cso.getStatsData().getDecimalStats().isSetBitVectors()
+ || cso.getStatsData().getDecimalStats().getBitVectors().length() == 0) {
+ isNDVBitVectorSet = false;
+ break;
+ }
+ }
+ ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+ if (doAllPartitionContainStats || css.size() < 2) {
+ DecimalColumnStatsData aggregateData = null;
+ long lowerBound = 0;
+ long higherBound = 0;
+ double densityAvgSum = 0.0;
+ NumDistinctValueEstimator ndvEstimator = null;
+ if (isNDVBitVectorSet) {
+ ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ }
+ for (ColumnStatistics cs : css) {
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
+ if (useDensityFunctionForNDVEstimation) {
+ lowerBound = Math.max(lowerBound, newData.getNumDVs());
+ higherBound += newData.getNumDVs();
+ densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
+ .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
+ }
+ if (isNDVBitVectorSet) {
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ }
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils
+ .getDoubleValue(newData.getLowValue())) {
+ aggregateData.setLowValue(aggregateData.getLowValue());
+ } else {
+ aggregateData.setLowValue(newData.getLowValue());
+ }
+ if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils
+ .getDoubleValue(newData.getHighValue())) {
+ aggregateData.setHighValue(aggregateData.getHighValue());
+ } else {
+ aggregateData.setHighValue(newData.getHighValue());
+ }
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+ }
+ if (isNDVBitVectorSet) {
+ // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+ // use uniform distribution assumption because we can merge bitvectors
+ // to get a good estimation.
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ } else {
+ if (useDensityFunctionForNDVEstimation) {
+ // We have estimation, lowerbound and higherbound. We use estimation
+ // if it is between lowerbound and higherbound.
+ double densityAvg = densityAvgSum / partNames.size();
+ long estimation = (long) ((HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+ .getDoubleValue(aggregateData.getLowValue())) / densityAvg);
+ if (estimation < lowerBound) {
+ aggregateData.setNumDVs(lowerBound);
+ } else if (estimation > higherBound) {
+ aggregateData.setNumDVs(higherBound);
+ } else {
+ aggregateData.setNumDVs(estimation);
+ }
+ } else {
+ // Without useDensityFunctionForNDVEstimation, we just use the
+ // default one, which is the max of all the partitions and it is
+ // already done.
+ }
+ }
+ columnStatisticsData.setDecimalStats(aggregateData);
+ } else {
+ // we need extrapolation
+ Map<String, Integer> indexMap = new HashMap<String, Integer>();
+ for (int index = 0; index < partNames.size(); index++) {
+ indexMap.put(partNames.get(index), index);
+ }
+ Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+ Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+ // while we scan the css, we also get the densityAvg, lowerbound and
+ // higerbound when useDensityFunctionForNDVEstimation is true.
+ double densityAvgSum = 0.0;
+ if (!isNDVBitVectorSet) {
+ // if not every partition uses bitvector for ndv, we just fall back to
+ // the traditional extrapolation methods.
+ for (ColumnStatistics cs : css) {
+ String partName = cs.getStatsDesc().getPartName();
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
+ .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
+ }
+ adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+ adjustedStatsMap.put(partName, cso.getStatsData());
+ }
+ } else {
+ // we first merge all the adjacent bitvectors that we could merge and
+ // derive new partition names and index.
+ NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ StringBuilder pseudoPartName = new StringBuilder();
+ double pseudoIndexSum = 0;
+ int length = 0;
+ int curIndex = -1;
+ DecimalColumnStatsData aggregateData = null;
+ for (ColumnStatistics cs : css) {
+ String partName = cs.getStatsDesc().getPartName();
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
+ // newData.isSetBitVectors() should be true for sure because we
+ // already checked it before.
+ if (indexMap.get(partName) != curIndex) {
+ // There is bitvector, but it is not adjacent to the previous ones.
+ if (length > 0) {
+ // we have to set ndv
+ adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ csd.setDecimalStats(aggregateData);
+ adjustedStatsMap.put(pseudoPartName.toString(), csd);
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+ .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs();
+ }
+ // reset everything
+ pseudoPartName = new StringBuilder();
+ pseudoIndexSum = 0;
+ length = 0;
+ }
+ aggregateData = null;
+ }
+ curIndex = indexMap.get(partName);
+ pseudoPartName.append(partName);
+ pseudoIndexSum += curIndex;
+ length++;
+ curIndex++;
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils
+ .getDoubleValue(newData.getLowValue())) {
+ aggregateData.setLowValue(aggregateData.getLowValue());
+ } else {
+ aggregateData.setLowValue(newData.getLowValue());
+ }
+ if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils
+ .getDoubleValue(newData.getHighValue())) {
+ aggregateData.setHighValue(aggregateData.getHighValue());
+ } else {
+ aggregateData.setHighValue(newData.getHighValue());
+ }
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ }
+ if (length > 0) {
+ // we have to set ndv
+ adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ csd.setDecimalStats(aggregateData);
+ adjustedStatsMap.put(pseudoPartName.toString(), csd);
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
+ .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs();
+ }
+ }
+ }
+ extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+ adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+ }
+ statsObj.setStatsData(columnStatisticsData);
+ return statsObj;
+ }
+
+ @Override
+ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+ int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+ Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+ int rightBorderInd = numParts;
+ DecimalColumnStatsData extrapolateDecimalData = new DecimalColumnStatsData();
+ Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+ for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+ extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats());
+ }
+ List<Map.Entry<String, DecimalColumnStatsData>> list = new LinkedList<Map.Entry<String, DecimalColumnStatsData>>(
+ extractedAdjustedStatsMap.entrySet());
+ // get the lowValue
+ Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
+ public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+ Map.Entry<String, DecimalColumnStatsData> o2) {
+ return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
+ }
+ });
+ double minInd = adjustedIndexMap.get(list.get(0).getKey());
+ double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ double lowValue = 0;
+ double min = HBaseUtils.getDoubleValue(list.get(0).getValue().getLowValue());
+ double max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getLowValue());
+ if (minInd == maxInd) {
+ lowValue = min;
+ } else if (minInd < maxInd) {
+ // left border is the min
+ lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
+ } else {
+ // right border is the min
+ lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+ }
+
+ // get the highValue
+ Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
+ public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+ Map.Entry<String, DecimalColumnStatsData> o2) {
+ return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
+ }
+ });
+ minInd = adjustedIndexMap.get(list.get(0).getKey());
+ maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ double highValue = 0;
+ min = HBaseUtils.getDoubleValue(list.get(0).getValue().getHighValue());
+ max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getHighValue());
+ if (minInd == maxInd) {
+ highValue = min;
+ } else if (minInd < maxInd) {
+ // right border is the max
+ highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+ } else {
+ // left border is the max
+ highValue = (min + (max - min) * minInd / (minInd - maxInd));
+ }
+
+ // get the #nulls
+ long numNulls = 0;
+ for (Map.Entry<String, DecimalColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+ numNulls += entry.getValue().getNumNulls();
+ }
+ // we scale up sumNulls based on the number of partitions
+ numNulls = numNulls * numParts / numPartsWithStats;
+
+ // get the ndv
+ long ndv = 0;
+ long ndvMin = 0;
+ long ndvMax = 0;
+ Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
+ public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
+ Map.Entry<String, DecimalColumnStatsData> o2) {
+ return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+ }
+ });
+ long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+ long higherBound = 0;
+ for (Map.Entry<String, DecimalColumnStatsData> entry : list) {
+ higherBound += entry.getValue().getNumDVs();
+ }
+ if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+ ndv = (long) ((highValue - lowValue) / densityAvg);
+ if (ndv < lowerBound) {
+ ndv = lowerBound;
+ } else if (ndv > higherBound) {
+ ndv = higherBound;
+ }
} else {
- ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
- ndvEstimator.getnumBitVectors()));
- aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
- aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+ minInd = adjustedIndexMap.get(list.get(0).getKey());
+ maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ ndvMin = list.get(0).getValue().getNumDVs();
+ ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
+ if (minInd == maxInd) {
+ ndv = ndvMin;
+ } else if (minInd < maxInd) {
+ // right border is the max
+ ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd));
+ } else {
+ // left border is the max
+ ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
+ }
}
+ extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String
+ .valueOf(lowValue)));
+ extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String
+ .valueOf(highValue)));
+ extrapolateDecimalData.setNumNulls(numNulls);
+ extrapolateDecimalData.setNumDVs(ndv);
+ extrapolateData.setDecimalStats(extrapolateDecimalData);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
index d945ec2..a88ef84 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
@@ -19,26 +19,307 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
-public class DoubleColumnStatsAggregator extends ColumnStatsAggregator {
+public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implements
+ IExtrapolatePartStatus {
@Override
- public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
- DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats();
- DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats();
- aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
- aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
- aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
- if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
- aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException {
+ ColumnStatisticsObj statsObj = null;
+
+ // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+ // bitvectors
+ boolean doAllPartitionContainStats = partNames.size() == css.size();
+ boolean isNDVBitVectorSet = true;
+ String colType = null;
+ for (ColumnStatistics cs : css) {
+ if (cs.getStatsObjSize() != 1) {
+ throw new MetaException(
+ "The number of columns should be exactly one in aggrStats, but found "
+ + cs.getStatsObjSize());
+ }
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ if (statsObj == null) {
+ colType = cso.getColType();
+ statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+ .getStatsData().getSetField());
+ }
+ if (numBitVectors <= 0 || !cso.getStatsData().getDoubleStats().isSetBitVectors()
+ || cso.getStatsData().getDoubleStats().getBitVectors().length() == 0) {
+ isNDVBitVectorSet = false;
+ break;
+ }
+ }
+ ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+ if (doAllPartitionContainStats || css.size() < 2) {
+ DoubleColumnStatsData aggregateData = null;
+ long lowerBound = 0;
+ long higherBound = 0;
+ double densityAvgSum = 0.0;
+ NumDistinctValueEstimator ndvEstimator = null;
+ if (isNDVBitVectorSet) {
+ ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ }
+ for (ColumnStatistics cs : css) {
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+ if (useDensityFunctionForNDVEstimation) {
+ lowerBound = Math.max(lowerBound, newData.getNumDVs());
+ higherBound += newData.getNumDVs();
+ densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+ }
+ if (isNDVBitVectorSet) {
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ }
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData
+ .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+ }
+ if (isNDVBitVectorSet) {
+ // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+ // use uniform distribution assumption because we can merge bitvectors
+ // to get a good estimation.
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ } else {
+ if (useDensityFunctionForNDVEstimation) {
+ // We have estimation, lowerbound and higherbound. We use estimation
+ // if it is between lowerbound and higherbound.
+ double densityAvg = densityAvgSum / partNames.size();
+ long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
+ if (estimation < lowerBound) {
+ aggregateData.setNumDVs(lowerBound);
+ } else if (estimation > higherBound) {
+ aggregateData.setNumDVs(higherBound);
+ } else {
+ aggregateData.setNumDVs(estimation);
+ }
+ } else {
+ // Without useDensityFunctionForNDVEstimation, we just use the
+ // default one, which is the max of all the partitions and it is
+ // already done.
+ }
+ }
+ columnStatisticsData.setDoubleStats(aggregateData);
+ } else {
+ // we need extrapolation
+ Map<String, Integer> indexMap = new HashMap<String, Integer>();
+ for (int index = 0; index < partNames.size(); index++) {
+ indexMap.put(partNames.get(index), index);
+ }
+ Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+ Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+ // while we scan the css, we also get the densityAvg, lowerbound and
+ // higerbound when useDensityFunctionForNDVEstimation is true.
+ double densityAvgSum = 0.0;
+ if (!isNDVBitVectorSet) {
+ // if not every partition uses bitvector for ndv, we just fall back to
+ // the traditional extrapolation methods.
+ for (ColumnStatistics cs : css) {
+ String partName = cs.getStatsDesc().getPartName();
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+ }
+ adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+ adjustedStatsMap.put(partName, cso.getStatsData());
+ }
+ } else {
+ // we first merge all the adjacent bitvectors that we could merge and
+ // derive new partition names and index.
+ NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ StringBuilder pseudoPartName = new StringBuilder();
+ double pseudoIndexSum = 0;
+ int length = 0;
+ int curIndex = -1;
+ DoubleColumnStatsData aggregateData = null;
+ for (ColumnStatistics cs : css) {
+ String partName = cs.getStatsDesc().getPartName();
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
+ // newData.isSetBitVectors() should be true for sure because we
+ // already checked it before.
+ if (indexMap.get(partName) != curIndex) {
+ // There is bitvector, but it is not adjacent to the previous ones.
+ if (length > 0) {
+ // we have to set ndv
+ adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ csd.setDoubleStats(aggregateData);
+ adjustedStatsMap.put(pseudoPartName.toString(), csd);
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+ }
+ // reset everything
+ pseudoPartName = new StringBuilder();
+ pseudoIndexSum = 0;
+ length = 0;
+ }
+ aggregateData = null;
+ }
+ curIndex = indexMap.get(partName);
+ pseudoPartName.append(partName);
+ pseudoIndexSum += curIndex;
+ length++;
+ curIndex++;
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
+ newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ }
+ if (length > 0) {
+ // we have to set ndv
+ adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ csd.setDoubleStats(aggregateData);
+ adjustedStatsMap.put(pseudoPartName.toString(), csd);
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+ }
+ }
+ }
+ extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+ adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
+ }
+ statsObj.setStatsData(columnStatisticsData);
+ return statsObj;
+ }
+
+ @Override
+ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+ int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+ Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+ int rightBorderInd = numParts;
+ DoubleColumnStatsData extrapolateDoubleData = new DoubleColumnStatsData();
+ Map<String, DoubleColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+ for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+ extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDoubleStats());
+ }
+ List<Map.Entry<String, DoubleColumnStatsData>> list = new LinkedList<Map.Entry<String, DoubleColumnStatsData>>(
+ extractedAdjustedStatsMap.entrySet());
+ // get the lowValue
+ Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
+ public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+ Map.Entry<String, DoubleColumnStatsData> o2) {
+ return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1;
+ }
+ });
+ double minInd = adjustedIndexMap.get(list.get(0).getKey());
+ double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ double lowValue = 0;
+ double min = list.get(0).getValue().getLowValue();
+ double max = list.get(list.size() - 1).getValue().getLowValue();
+ if (minInd == maxInd) {
+ lowValue = min;
+ } else if (minInd < maxInd) {
+ // left border is the min
+ lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
+ } else {
+ // right border is the min
+ lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+ }
+
+ // get the highValue
+ Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
+ public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+ Map.Entry<String, DoubleColumnStatsData> o2) {
+ return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1;
+ }
+ });
+ minInd = adjustedIndexMap.get(list.get(0).getKey());
+ maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ double highValue = 0;
+ min = list.get(0).getValue().getHighValue();
+ max = list.get(list.size() - 1).getValue().getHighValue();
+ if (minInd == maxInd) {
+ highValue = min;
+ } else if (minInd < maxInd) {
+ // right border is the max
+ highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+ } else {
+ // left border is the max
+ highValue = (min + (max - min) * minInd / (minInd - maxInd));
+ }
+
+ // get the #nulls
+ long numNulls = 0;
+ for (Map.Entry<String, DoubleColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+ numNulls += entry.getValue().getNumNulls();
+ }
+ // we scale up sumNulls based on the number of partitions
+ numNulls = numNulls * numParts / numPartsWithStats;
+
+ // get the ndv
+ long ndv = 0;
+ long ndvMin = 0;
+ long ndvMax = 0;
+ Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
+ public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
+ Map.Entry<String, DoubleColumnStatsData> o2) {
+ return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+ }
+ });
+ long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+ long higherBound = 0;
+ for (Map.Entry<String, DoubleColumnStatsData> entry : list) {
+ higherBound += entry.getValue().getNumDVs();
+ }
+ if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+ ndv = (long) ((highValue - lowValue) / densityAvg);
+ if (ndv < lowerBound) {
+ ndv = lowerBound;
+ } else if (ndv > higherBound) {
+ ndv = higherBound;
+ }
} else {
- ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
- ndvEstimator.getnumBitVectors()));
- aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
- aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+ minInd = adjustedIndexMap.get(list.get(0).getKey());
+ maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ ndvMin = list.get(0).getValue().getNumDVs();
+ ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
+ if (minInd == maxInd) {
+ ndv = ndvMin;
+ } else if (minInd < maxInd) {
+ // right border is the max
+ ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd));
+ } else {
+ // left border is the max
+ ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
+ }
}
+ extrapolateDoubleData.setLowValue(lowValue);
+ extrapolateDoubleData.setHighValue(highValue);
+ extrapolateDoubleData.setNumNulls(numNulls);
+ extrapolateDoubleData.setNumDVs(ndv);
+ extrapolateData.setDoubleStats(extrapolateDoubleData);
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
new file mode 100644
index 0000000..99af060
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
@@ -0,0 +1,30 @@
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+
+public interface IExtrapolatePartStatus {
+ // The following function will extrapolate the stats when the column stats of
+ // some partitions are missing.
+ /**
+ * @param extrapolateData
+ * it will carry back the specific stats, e.g., DOUBLE_STATS or
+ * LONG_STATS
+ * @param numParts
+ * the total number of partitions
+ * @param numPartsWithStats
+ * the number of partitions that have stats
+ * @param adjustedIndexMap
+ * the partition name to index map
+ * @param adjustedStatsMap
+ * the partition name to its stats map
+ * @param densityAvg
+ * the average of ndv density, which is useful when
+ * useDensityFunctionForNDVEstimation is true.
+ */
+ public abstract void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+ int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+ Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
index 068dd00..8ac6561 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
@@ -19,26 +19,305 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
-public class LongColumnStatsAggregator extends ColumnStatsAggregator {
+public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
+ IExtrapolatePartStatus {
@Override
- public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
- LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats();
- LongColumnStatsData newData = newColStats.getStatsData().getLongStats();
- aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
- aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
- aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
- if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
- aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException {
+ ColumnStatisticsObj statsObj = null;
+
+ // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+ // bitvectors
+ boolean doAllPartitionContainStats = partNames.size() == css.size();
+ boolean isNDVBitVectorSet = true;
+ String colType = null;
+ for (ColumnStatistics cs : css) {
+ if (cs.getStatsObjSize() != 1) {
+ throw new MetaException(
+ "The number of columns should be exactly one in aggrStats, but found "
+ + cs.getStatsObjSize());
+ }
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ if (statsObj == null) {
+ colType = cso.getColType();
+ statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+ .getStatsData().getSetField());
+ }
+ if (numBitVectors <= 0 || !cso.getStatsData().getLongStats().isSetBitVectors()
+ || cso.getStatsData().getLongStats().getBitVectors().length() == 0) {
+ isNDVBitVectorSet = false;
+ break;
+ }
+ }
+ ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+ if (doAllPartitionContainStats || css.size() < 2) {
+ LongColumnStatsData aggregateData = null;
+ long lowerBound = 0;
+ long higherBound = 0;
+ double densityAvgSum = 0.0;
+ NumDistinctValueEstimator ndvEstimator = null;
+ if (isNDVBitVectorSet) {
+ ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ }
+ for (ColumnStatistics cs : css) {
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ LongColumnStatsData newData = cso.getStatsData().getLongStats();
+ if (useDensityFunctionForNDVEstimation) {
+ lowerBound = Math.max(lowerBound, newData.getNumDVs());
+ higherBound += newData.getNumDVs();
+ densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+ }
+ if (isNDVBitVectorSet) {
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ }
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData
+ .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+ }
+ if (isNDVBitVectorSet) {
+ // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
+ // use uniform distribution assumption because we can merge bitvectors
+ // to get a good estimation.
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ } else {
+ if (useDensityFunctionForNDVEstimation) {
+ // We have estimation, lowerbound and higherbound. We use estimation
+ // if it is between lowerbound and higherbound.
+ double densityAvg = densityAvgSum / partNames.size();
+ long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
+ if (estimation < lowerBound) {
+ aggregateData.setNumDVs(lowerBound);
+ } else if (estimation > higherBound) {
+ aggregateData.setNumDVs(higherBound);
+ } else {
+ aggregateData.setNumDVs(estimation);
+ }
+ } else {
+ // Without useDensityFunctionForNDVEstimation, we just use the
+ // default one, which is the max of all the partitions and it is
+ // already done.
+ }
+ }
+ columnStatisticsData.setLongStats(aggregateData);
} else {
- ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
- ndvEstimator.getnumBitVectors()));
- aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
- aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+ // we need extrapolation
+ Map<String, Integer> indexMap = new HashMap<String, Integer>();
+ for (int index = 0; index < partNames.size(); index++) {
+ indexMap.put(partNames.get(index), index);
+ }
+ Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
+ Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
+ // while we scan the css, we also get the densityAvg, lowerbound and
+ // higerbound when useDensityFunctionForNDVEstimation is true.
+ double densityAvgSum = 0.0;
+ if (!isNDVBitVectorSet) {
+ // if not every partition uses bitvector for ndv, we just fall back to
+ // the traditional extrapolation methods.
+ for (ColumnStatistics cs : css) {
+ String partName = cs.getStatsDesc().getPartName();
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ LongColumnStatsData newData = cso.getStatsData().getLongStats();
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
+ }
+ adjustedIndexMap.put(partName, (double) indexMap.get(partName));
+ adjustedStatsMap.put(partName, cso.getStatsData());
+ }
+ } else {
+ // we first merge all the adjacent bitvectors that we could merge and
+ // derive new partition names and index.
+ NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ StringBuilder pseudoPartName = new StringBuilder();
+ double pseudoIndexSum = 0;
+ int length = 0;
+ int curIndex = -1;
+ LongColumnStatsData aggregateData = null;
+ for (ColumnStatistics cs : css) {
+ String partName = cs.getStatsDesc().getPartName();
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ LongColumnStatsData newData = cso.getStatsData().getLongStats();
+ // newData.isSetBitVectors() should be true for sure because we
+ // already checked it before.
+ if (indexMap.get(partName) != curIndex) {
+ // There is bitvector, but it is not adjacent to the previous ones.
+ if (length > 0) {
+ // we have to set ndv
+ adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ csd.setLongStats(aggregateData);
+ adjustedStatsMap.put(pseudoPartName.toString(), csd);
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+ }
+ // reset everything
+ pseudoPartName = new StringBuilder();
+ pseudoIndexSum = 0;
+ length = 0;
+ }
+ aggregateData = null;
+ }
+ curIndex = indexMap.get(partName);
+ pseudoPartName.append(partName);
+ pseudoIndexSum += curIndex;
+ length++;
+ curIndex++;
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
+ newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ }
+ if (length > 0) {
+ // we have to set ndv
+ adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
+ aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ csd.setLongStats(aggregateData);
+ adjustedStatsMap.put(pseudoPartName.toString(), csd);
+ if (useDensityFunctionForNDVEstimation) {
+ densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
+ }
+ }
+ }
+ extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
+ adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
}
+ statsObj.setStatsData(columnStatisticsData);
+ return statsObj;
}
+
+ @Override
+ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
+ int numPartsWithStats, Map<String, Double> adjustedIndexMap,
+ Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
+ int rightBorderInd = numParts;
+ LongColumnStatsData extrapolateLongData = new LongColumnStatsData();
+ Map<String, LongColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
+ for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
+ extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getLongStats());
+ }
+ List<Map.Entry<String, LongColumnStatsData>> list = new LinkedList<Map.Entry<String, LongColumnStatsData>>(
+ extractedAdjustedStatsMap.entrySet());
+ // get the lowValue
+ Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
+ public int compare(Map.Entry<String, LongColumnStatsData> o1,
+ Map.Entry<String, LongColumnStatsData> o2) {
+ return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1;
+ }
+ });
+ double minInd = adjustedIndexMap.get(list.get(0).getKey());
+ double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ long lowValue = 0;
+ long min = list.get(0).getValue().getLowValue();
+ long max = list.get(list.size() - 1).getValue().getLowValue();
+ if (minInd == maxInd) {
+ lowValue = min;
+ } else if (minInd < maxInd) {
+ // left border is the min
+ lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
+ } else {
+ // right border is the min
+ lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
+ }
+
+ // get the highValue
+ Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
+ public int compare(Map.Entry<String, LongColumnStatsData> o1,
+ Map.Entry<String, LongColumnStatsData> o2) {
+ return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1;
+ }
+ });
+ minInd = adjustedIndexMap.get(list.get(0).getKey());
+ maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ long highValue = 0;
+ min = list.get(0).getValue().getHighValue();
+ max = list.get(list.size() - 1).getValue().getHighValue();
+ if (minInd == maxInd) {
+ highValue = min;
+ } else if (minInd < maxInd) {
+ // right border is the max
+ highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+ } else {
+ // left border is the max
+ highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
+ }
+
+ // get the #nulls
+ long numNulls = 0;
+ for (Map.Entry<String, LongColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
+ numNulls += entry.getValue().getNumNulls();
+ }
+ // we scale up sumNulls based on the number of partitions
+ numNulls = numNulls * numParts / numPartsWithStats;
+
+ // get the ndv
+ long ndv = 0;
+ Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
+ public int compare(Map.Entry<String, LongColumnStatsData> o1,
+ Map.Entry<String, LongColumnStatsData> o2) {
+ return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
+ }
+ });
+ long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
+ long higherBound = 0;
+ for (Map.Entry<String, LongColumnStatsData> entry : list) {
+ higherBound += entry.getValue().getNumDVs();
+ }
+ if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
+ ndv = (long) ((highValue - lowValue) / densityAvg);
+ if (ndv < lowerBound) {
+ ndv = lowerBound;
+ } else if (ndv > higherBound) {
+ ndv = higherBound;
+ }
+ } else {
+ minInd = adjustedIndexMap.get(list.get(0).getKey());
+ maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
+ min = list.get(0).getValue().getNumDVs();
+ max = list.get(list.size() - 1).getValue().getNumDVs();
+ if (minInd == maxInd) {
+ ndv = min;
+ } else if (minInd < maxInd) {
+ // right border is the max
+ ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
+ } else {
+ // left border is the max
+ ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
+ }
+ }
+ extrapolateLongData.setLowValue(lowValue);
+ extrapolateLongData.setHighValue(highValue);
+ extrapolateLongData.setNumNulls(numNulls);
+ extrapolateLongData.setNumDVs(ndv);
+ extrapolateData.setLongStats(extrapolateLongData);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
index aeb6c39..2aa4046 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
@@ -19,26 +19,87 @@
package org.apache.hadoop.hive.metastore.hbase.stats;
+import java.util.List;
+
import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
public class StringColumnStatsAggregator extends ColumnStatsAggregator {
@Override
- public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
- StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats();
- StringColumnStatsData newData = newColStats.getStatsData().getStringStats();
- aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
- aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
- aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
- if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
- aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
- } else {
- ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
- ndvEstimator.getnumBitVectors()));
+ public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
+ List<ColumnStatistics> css) throws MetaException {
+ ColumnStatisticsObj statsObj = null;
+
+ // check if all the ColumnStatisticsObjs contain stats and all the ndv are
+ // bitvectors. Only when both of the conditions are true, we merge bit
+ // vectors. Otherwise, just use the maximum function.
+ boolean doAllPartitionContainStats = partNames.size() == css.size();
+ boolean isNDVBitVectorSet = true;
+ String colType = null;
+ for (ColumnStatistics cs : css) {
+ if (cs.getStatsObjSize() != 1) {
+ throw new MetaException(
+ "The number of columns should be exactly one in aggrStats, but found "
+ + cs.getStatsObjSize());
+ }
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ if (statsObj == null) {
+ colType = cso.getColType();
+ statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
+ .getStatsData().getSetField());
+ }
+ if (numBitVectors <= 0 || !cso.getStatsData().getStringStats().isSetBitVectors()
+ || cso.getStatsData().getStringStats().getBitVectors().length() == 0) {
+ isNDVBitVectorSet = false;
+ break;
+ }
+ }
+ ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
+ if (doAllPartitionContainStats && isNDVBitVectorSet) {
+ StringColumnStatsData aggregateData = null;
+ NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
+ for (ColumnStatistics cs : css) {
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ StringColumnStatsData newData = cso.getStatsData().getStringStats();
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData
+ .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+ aggregateData
+ .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+ }
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
- aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+ columnStatisticsData.setStringStats(aggregateData);
+ } else {
+ StringColumnStatsData aggregateData = null;
+ for (ColumnStatistics cs : css) {
+ ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
+ StringColumnStatsData newData = cso.getStatsData().getStringStats();
+ if (aggregateData == null) {
+ aggregateData = newData.deepCopy();
+ } else {
+ aggregateData
+ .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+ aggregateData
+ .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+ }
+ columnStatisticsData.setStringStats(aggregateData);
}
+ statsObj.setStatsData(columnStatisticsData);
+ return statsObj;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
index 36c7984..e0c4094 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
@@ -156,10 +156,8 @@ public class TestHBaseAggregateStatsCacheWithBitVector {
Assert.assertEquals(-20.12, dcsd.getLowValue(), 0.01);
Assert.assertEquals(60, dcsd.getNumNulls());
Assert.assertEquals(5, dcsd.getNumDVs());
- Assert
- .assertEquals(
- "{0, 1, 4, 5, 7}{0, 1}{0, 1, 2, 4}{0, 1, 2, 4}{0, 1, 2}{0, 2}{0, 1, 3, 4}{0, 1, 2, 3, 4}{0, 1, 4}{0, 1, 3, 4, 6}{0, 2}{0, 1, 3, 8}{0, 2, 3}{0, 2}{0, 1, 9}{0, 1, 4}",
- dcsd.getBitVectors());
+ // we do not store the bitvector for the aggrStats.
+ // we can store that if it is necessary in the future.
}
};