You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2017/05/22 22:58:00 UTC
[2/2] hive git commit: HIVE-16579: CachedStore: improvements to
partition col stats caching and cache column stats for unpartitioned table
(Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai,
Thejas Nair)
HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d85beaa9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d85beaa9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d85beaa9
Branch: refs/heads/master
Commit: d85beaa99ba349d9334d3d96abb6e89c94db8481
Parents: 952fe6e
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon May 22 15:52:58 2017 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon May 22 15:52:58 2017 -0700
----------------------------------------------------------------------
.../listener/DummyRawStoreFailEvent.java | 4 +-
.../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +-
.../hive/metastore/MetaStoreDirectSql.java | 73 +-
.../hadoop/hive/metastore/MetaStoreUtils.java | 11 +-
.../hadoop/hive/metastore/ObjectStore.java | 19 +-
.../apache/hadoop/hive/metastore/RawStore.java | 8 +-
.../hive/metastore/StatObjectConverter.java | 148 +++
.../hadoop/hive/metastore/cache/CacheUtils.java | 31 +
.../hive/metastore/cache/CachedStore.java | 943 ++++++++++++-------
.../hive/metastore/cache/SharedCache.java | 293 +++++-
.../hadoop/hive/metastore/hbase/HBaseStore.java | 2 +-
.../stats/merge/ColumnStatsMergerFactory.java | 18 +-
.../stats/merge/DateColumnStatsMerger.java | 55 ++
.../DummyRawStoreControlledCommit.java | 2 +-
.../DummyRawStoreForJdoConnection.java | 2 +-
.../hive/metastore/cache/TestCachedStore.java | 450 ++++++++-
16 files changed, 1637 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 91a3a38..3dc63bd 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -914,9 +914,9 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
- return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ return objectStore.getColStatsForTablePartitions(dbName, tableName);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index d296851..111cc11 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -350,7 +350,7 @@ public class QTestUtil {
if (!useHBaseMetastore) {
// Plug verifying metastore in for testing DirectSQL.
conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
- "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
+ "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
} else {
conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName());
conf.setBoolVar(ConfVars.METASTORE_FASTPATH, true);
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index b96c27e..df73693 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -1208,7 +1208,9 @@ class MetaStoreDirectSql {
}
};
List<Object[]> list = runBatched(colNames, b);
- if (list.isEmpty()) return null;
+ if (list.isEmpty()) {
+ return null;
+ }
ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
ColumnStatistics result = makeColumnStats(list, csd, 0);
b.closeAllQueries();
@@ -1343,41 +1345,26 @@ class MetaStoreDirectSql {
// Get aggregated column stats for a table per partition for all columns in the partition
// This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
- String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
- String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
- + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
- + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
- + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
- + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
- // The following data is used to compute a partitioned table's NDV based
- // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
- // accurately derived from partition NDVs, because the domain of column value two partitions
- // can overlap. If there is no overlap then global NDV is just the sum
- // of partition NDVs (UpperBound). But if there is some overlay then
- // global NDV can be anywhere between sum of partition NDVs (no overlap)
- // and same as one of the partition NDV (domain of column value in all other
- // partitions is subset of the domain value in one of the partition)
- // (LowerBound).But under uniform distribution, we can roughly estimate the global
- // NDV by leveraging the min/max values.
- // And, we also guarantee that the estimation makes sense by comparing it to the
- // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
- // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
- + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
- + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
- + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
- + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
- + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+ String tblName) throws MetaException {
+ String queryText =
+ "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", "
+ + "\"LONG_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", "
+ + "\"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", "
+ + "\"NUM_DISTINCTS\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\""
+ + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
+ + " order by \"PARTITION_NAME\"";
long start = 0;
long end = 0;
Query query = null;
boolean doTrace = LOG.isDebugEnabled();
Object qResult = null;
- ForwardQueryResult fqr = null;
start = doTrace ? System.nanoTime() : 0;
query = pm.newQuery("javax.jdo.query.SQL", queryText);
- qResult = executeWithArray(query,
- prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+ qResult =
+ executeWithArray(query,
+ prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()),
+ queryText);
if (qResult == null) {
query.closeAll();
return Maps.newHashMap();
@@ -1385,13 +1372,31 @@ class MetaStoreDirectSql {
end = doTrace ? System.nanoTime() : 0;
timingTrace(doTrace, queryText, start, end);
List<Object[]> list = ensureList(qResult);
- Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+ Map<String, List<ColumnStatisticsObj>> partColStatsMap =
+ new HashMap<String, List<ColumnStatisticsObj>>();
+ String partNameCurrent = null;
+ List<ColumnStatisticsObj> partColStatsList = new ArrayList<ColumnStatisticsObj>();
for (Object[] row : list) {
String partName = (String) row[0];
- String colName = (String) row[1];
- partColStatsMap.put(
- CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
- prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+ if (partNameCurrent == null) {
+ // Update the current partition we are working on
+ partNameCurrent = partName;
+ // Create a new list for this new partition
+ partColStatsList = new ArrayList<ColumnStatisticsObj>();
+ // Add the col stat for the current column
+ partColStatsList.add(prepareCSObj(row, 1));
+ } else if (!partNameCurrent.equalsIgnoreCase(partName)) {
+ // Save the previous partition and its col stat list
+ partColStatsMap.put(partNameCurrent, partColStatsList);
+ // Update the current partition we are working on
+ partNameCurrent = partName;
+ // Create a new list for this new partition
+ partColStatsList = new ArrayList<ColumnStatisticsObj>();
+ // Add the col stat for the current column
+ partColStatsList.add(prepareCSObj(row, 1));
+ } else {
+ partColStatsList.add(prepareCSObj(row, 1));
+ }
Deadline.checkTimeout();
}
query.closeAll();
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 870896c..8328428 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1171,6 +1171,15 @@ public class MetaStoreUtils {
return addCols(getSchemaWithoutCols(sd, tblsd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols());
}
+ public static List<String> getColumnNamesForTable(Table table) {
+ List<String> colNames = new ArrayList<String>();
+ Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator();
+ while (colsIterator.hasNext()) {
+ colNames.add(colsIterator.next().getName());
+ }
+ return colNames;
+ }
+
public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) {
// we first take a look if any fieldSchemas contain COMMA
for (int i = 0; i < fieldSchemas.size(); i++) {
@@ -1180,7 +1189,7 @@ public class MetaStoreUtils {
}
return String.valueOf(SerDeUtils.COMMA);
}
-
+
/**
* Convert FieldSchemas to columnNames.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index b28983f..19becb8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -7173,23 +7173,18 @@ public class ObjectStore implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
- final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
- HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
- final double ndvTuner = HiveConf.getFloatVar(getConf(),
- HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
- return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+ return new GetHelper<Map<String, List<ColumnStatisticsObj>>>(dbName, tableName, true, false) {
@Override
- protected Map<String, ColumnStatisticsObj> getSqlResult(
- GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
- return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
- useDensityFunctionForNDVEstimation, ndvTuner);
+ protected Map<String, List<ColumnStatisticsObj>> getSqlResult(
+ GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException {
+ return directSql.getColStatsForTablePartitions(dbName, tblName);
}
@Override
- protected Map<String, ColumnStatisticsObj> getJdoResult(
- GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+ protected Map<String, List<ColumnStatisticsObj>> getJdoResult(
+ GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException,
NoSuchObjectException {
// This is fast path for query optimizations, if we can find this info
// quickly using directSql, do it. No point in failing back to slow path
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index c1af690..964ffb2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -579,14 +579,16 @@ public interface RawStore extends Configurable {
List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
/**
- * Get all partition column statistics for a table
+ * Get all partition column statistics for a table in a db
+ *
* @param dbName
* @param tableName
- * @return Map of partition column statistics
+ * @return Map of partition column statistics. Key in the map is partition name. Value is a list
+ * of column stat object for each column in the partition
* @throws MetaException
* @throws NoSuchObjectException
*/
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index fcf6f27..2dc2804 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
import org.apache.hadoop.hive.metastore.model.MPartition;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MTable;
@@ -700,4 +701,151 @@ public class StatObjectConverter {
return new BigDecimal(new BigInteger(d.getUnscaled()), d.getScale()).toString();
}
+ /**
+ * Set field values in oldStatObj from newStatObj
+ * @param oldStatObj
+ * @param newStatObj
+ */
+ public static void setFieldsIntoOldStats(ColumnStatisticsObj oldStatObj,
+ ColumnStatisticsObj newStatObj) {
+ _Fields typeNew = newStatObj.getStatsData().getSetField();
+ _Fields typeOld = oldStatObj.getStatsData().getSetField();
+ typeNew = typeNew == typeOld ? typeNew : null;
+ switch (typeNew) {
+ case BOOLEAN_STATS:
+ BooleanColumnStatsData oldBooleanStatsData = oldStatObj.getStatsData().getBooleanStats();
+ BooleanColumnStatsData newBooleanStatsData = newStatObj.getStatsData().getBooleanStats();
+ if (newBooleanStatsData.isSetNumTrues()) {
+ oldBooleanStatsData.setNumTrues(newBooleanStatsData.getNumTrues());
+ }
+ if (newBooleanStatsData.isSetNumFalses()) {
+ oldBooleanStatsData.setNumFalses(newBooleanStatsData.getNumFalses());
+ }
+ if (newBooleanStatsData.isSetNumNulls()) {
+ oldBooleanStatsData.setNumNulls(newBooleanStatsData.getNumNulls());
+ }
+ if (newBooleanStatsData.isSetBitVectors()) {
+ oldBooleanStatsData.setBitVectors(newBooleanStatsData.getBitVectors());
+ }
+ break;
+ case LONG_STATS: {
+ LongColumnStatsData oldLongStatsData = oldStatObj.getStatsData().getLongStats();
+ LongColumnStatsData newLongStatsData = newStatObj.getStatsData().getLongStats();
+ if (newLongStatsData.isSetHighValue()) {
+ oldLongStatsData.setHighValue(newLongStatsData.getHighValue());
+ }
+ if (newLongStatsData.isSetLowValue()) {
+ oldLongStatsData.setLowValue(newLongStatsData.getLowValue());
+ }
+ if (newLongStatsData.isSetNumNulls()) {
+ oldLongStatsData.setNumNulls(newLongStatsData.getNumNulls());
+ }
+ if (newLongStatsData.isSetNumDVs()) {
+ oldLongStatsData.setNumDVs(newLongStatsData.getNumDVs());
+ }
+ if (newLongStatsData.isSetBitVectors()) {
+ oldLongStatsData.setBitVectors(newLongStatsData.getBitVectors());
+ }
+ break;
+ }
+ case DOUBLE_STATS: {
+ DoubleColumnStatsData oldDoubleStatsData = oldStatObj.getStatsData().getDoubleStats();
+ DoubleColumnStatsData newDoubleStatsData = newStatObj.getStatsData().getDoubleStats();
+ if (newDoubleStatsData.isSetHighValue()) {
+ oldDoubleStatsData.setHighValue(newDoubleStatsData.getHighValue());
+ }
+ if (newDoubleStatsData.isSetLowValue()) {
+ oldDoubleStatsData.setLowValue(newDoubleStatsData.getLowValue());
+ }
+ if (newDoubleStatsData.isSetNumNulls()) {
+ oldDoubleStatsData.setNumNulls(newDoubleStatsData.getNumNulls());
+ }
+ if (newDoubleStatsData.isSetNumDVs()) {
+ oldDoubleStatsData.setNumDVs(newDoubleStatsData.getNumDVs());
+ }
+ if (newDoubleStatsData.isSetBitVectors()) {
+ oldDoubleStatsData.setBitVectors(newDoubleStatsData.getBitVectors());
+ }
+ break;
+ }
+ case STRING_STATS: {
+ StringColumnStatsData oldStringStatsData = oldStatObj.getStatsData().getStringStats();
+ StringColumnStatsData newStringStatsData = newStatObj.getStatsData().getStringStats();
+ if (newStringStatsData.isSetMaxColLen()) {
+ oldStringStatsData.setMaxColLen(newStringStatsData.getMaxColLen());
+ }
+ if (newStringStatsData.isSetAvgColLen()) {
+ oldStringStatsData.setAvgColLen(newStringStatsData.getAvgColLen());
+ }
+ if (newStringStatsData.isSetNumNulls()) {
+ oldStringStatsData.setNumNulls(newStringStatsData.getNumNulls());
+ }
+ if (newStringStatsData.isSetNumDVs()) {
+ oldStringStatsData.setNumDVs(newStringStatsData.getNumDVs());
+ }
+ if (newStringStatsData.isSetBitVectors()) {
+ oldStringStatsData.setBitVectors(newStringStatsData.getBitVectors());
+ }
+ break;
+ }
+ case BINARY_STATS:
+ BinaryColumnStatsData oldBinaryStatsData = oldStatObj.getStatsData().getBinaryStats();
+ BinaryColumnStatsData newBinaryStatsData = newStatObj.getStatsData().getBinaryStats();
+ if (newBinaryStatsData.isSetMaxColLen()) {
+ oldBinaryStatsData.setMaxColLen(newBinaryStatsData.getMaxColLen());
+ }
+ if (newBinaryStatsData.isSetAvgColLen()) {
+ oldBinaryStatsData.setAvgColLen(newBinaryStatsData.getAvgColLen());
+ }
+ if (newBinaryStatsData.isSetNumNulls()) {
+ oldBinaryStatsData.setNumNulls(newBinaryStatsData.getNumNulls());
+ }
+ if (newBinaryStatsData.isSetBitVectors()) {
+ oldBinaryStatsData.setBitVectors(newBinaryStatsData.getBitVectors());
+ }
+ break;
+ case DECIMAL_STATS: {
+ DecimalColumnStatsData oldDecimalStatsData = oldStatObj.getStatsData().getDecimalStats();
+ DecimalColumnStatsData newDecimalStatsData = newStatObj.getStatsData().getDecimalStats();
+ if (newDecimalStatsData.isSetHighValue()) {
+ oldDecimalStatsData.setHighValue(newDecimalStatsData.getHighValue());
+ }
+ if (newDecimalStatsData.isSetLowValue()) {
+ oldDecimalStatsData.setLowValue(newDecimalStatsData.getLowValue());
+ }
+ if (newDecimalStatsData.isSetNumNulls()) {
+ oldDecimalStatsData.setNumNulls(newDecimalStatsData.getNumNulls());
+ }
+ if (newDecimalStatsData.isSetNumDVs()) {
+ oldDecimalStatsData.setNumDVs(newDecimalStatsData.getNumDVs());
+ }
+ if (newDecimalStatsData.isSetBitVectors()) {
+ oldDecimalStatsData.setBitVectors(newDecimalStatsData.getBitVectors());
+ }
+ break;
+ }
+ case DATE_STATS: {
+ DateColumnStatsData oldDateStatsData = oldStatObj.getStatsData().getDateStats();
+ DateColumnStatsData newDateStatsData = newStatObj.getStatsData().getDateStats();
+ if (newDateStatsData.isSetHighValue()) {
+ oldDateStatsData.setHighValue(newDateStatsData.getHighValue());
+ }
+ if (newDateStatsData.isSetLowValue()) {
+ oldDateStatsData.setLowValue(newDateStatsData.getLowValue());
+ }
+ if (newDateStatsData.isSetNumNulls()) {
+ oldDateStatsData.setNumNulls(newDateStatsData.getNumNulls());
+ }
+ if (newDateStatsData.isSetNumDVs()) {
+ oldDateStatsData.setNumDVs(newDateStatsData.getNumDVs());
+ }
+ if (newDateStatsData.isSetBitVectors()) {
+ oldDateStatsData.setBitVectors(newDateStatsData.getBitVectors());
+ }
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown stats type: " + typeNew.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index 668499b..280655d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -38,6 +38,10 @@ public class CacheUtils {
return dbName + delimit + tableName;
}
+ public static String buildKeyWithDelimit(String dbName, String tableName) {
+ return buildKey(dbName, tableName) + delimit;
+ }
+
public static String buildKey(String dbName, String tableName, List<String> partVals) {
String key = buildKey(dbName, tableName);
if (partVals == null || partVals.size() == 0) {
@@ -52,11 +56,38 @@ public class CacheUtils {
return key;
}
+ public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) {
+ return buildKey(dbName, tableName, partVals) + delimit;
+ }
+
public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
String key = buildKey(dbName, tableName, partVals);
return key + delimit + colName;
}
+ public static String buildKey(String dbName, String tableName, String colName) {
+ String key = buildKey(dbName, tableName);
+ return key + delimit + colName;
+ }
+
+ public static String[] splitTableColStats(String key) {
+ return key.split(delimit);
+ }
+
+ public static Object[] splitPartitionColStats(String key) {
+ Object[] result = new Object[4];
+ String[] comps = key.split(delimit);
+ result[0] = comps[0];
+ result[1] = comps[1];
+ List<String> vals = new ArrayList<String>();
+ for (int i=2;i<comps.length-2;i++) {
+ vals.add(comps[i]);
+ }
+ result[2] = vals;
+ result[3] = comps[comps.length-1];
+ return result;
+ }
+
public static Table assemble(TableWrapper wrapper) {
Table t = wrapper.getTable().deepCopy();
if (wrapper.getSdHash()!=null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 1cc838f..78aab91 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -26,12 +26,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Deadline;
import org.apache.hadoop.hive.metastore.FileMetadataHandler;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -41,18 +44,11 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
import org.apache.hadoop.hive.metastore.api.Function;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -77,13 +72,14 @@ import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.Type;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMerger;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMergerFactory;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -104,15 +100,25 @@ import com.google.common.annotations.VisibleForTesting;
// TODO initial load slow?
// TODO size estimation
// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
-// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object
-// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects)
public class CachedStore implements RawStore, Configurable {
private static ScheduledExecutorService cacheUpdateMaster = null;
- private static AtomicReference<Thread> runningMasterThread = new AtomicReference<Thread>(null);
+ private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
+ true);
+ private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
RawStore rawStore;
Configuration conf;
private PartitionExpressionProxy expressionProxy = null;
+ // Default value set to 100 milliseconds for test purpose
+ private long cacheRefreshPeriod = 100;
static boolean firstTime = true;
static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
@@ -209,6 +215,8 @@ public class CachedStore implements RawStore, Configurable {
LOG.info("Prewarming CachedStore");
prewarm();
LOG.info("CachedStore initialized");
+ // Start the cache update master-worker threads
+ startCacheUpdateService();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -216,7 +224,10 @@ public class CachedStore implements RawStore, Configurable {
}
}
- private void prewarm() throws Exception {
+ @VisibleForTesting
+ void prewarm() throws Exception {
+ // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+ Deadline.registerIfNot(1000000);
List<String> dbNames = rawStore.getAllDatabases();
for (String dbName : dbNames) {
Database db = rawStore.getDatabase(dbName);
@@ -226,35 +237,81 @@ public class CachedStore implements RawStore, Configurable {
Table table = rawStore.getTable(dbName, tblName);
SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName),
HiveStringUtils.normalizeIdentifier(tblName), table);
+ Deadline.startTimer("getPartitions");
List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ Deadline.stopTimer();
for (Partition partition : partitions) {
SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
HiveStringUtils.normalizeIdentifier(tblName), partition);
}
- Map<String, ColumnStatisticsObj> aggrStatsPerPartition = rawStore
- .getAggrColStatsForTablePartitions(dbName, tblName);
- SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition);
+ // Cache partition column stats
+ Deadline.startTimer("getColStatsForTablePartitions");
+ Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+ rawStore.getColStatsForTablePartitions(dbName, tblName);
+ Deadline.stopTimer();
+ if (colStatsPerPartition != null) {
+ SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition);
+ }
+ // Cache table column stats
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ Deadline.startTimer("getTableColumnStatistics");
+ ColumnStatistics tableColStats =
+ rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
+ SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ }
}
}
- // Start the cache update master-worker threads
- startCacheUpdateService();
}
- private synchronized void startCacheUpdateService() {
+ @VisibleForTesting
+ synchronized void startCacheUpdateService() {
if (cacheUpdateMaster == null) {
cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
t.setDaemon(true);
return t;
}
});
- cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf
- .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
- TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+ cacheRefreshPeriod =
+ HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
+ TimeUnit.MILLISECONDS);
+ }
+ LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms");
+ cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod,
+ cacheRefreshPeriod, TimeUnit.MILLISECONDS);
}
}
+ @VisibleForTesting
+ synchronized boolean stopCacheUpdateService(long timeout) {
+ boolean tasksStoppedBeforeShutdown = false;
+ if (cacheUpdateMaster != null) {
+ LOG.info("CachedStore: shutting down cache update service");
+ try {
+ tasksStoppedBeforeShutdown =
+ cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
+ + "complete before shutting down. Will make a hard stop now.");
+ }
+ cacheUpdateMaster.shutdownNow();
+ cacheUpdateMaster = null;
+ }
+ return tasksStoppedBeforeShutdown;
+ }
+
+ @VisibleForTesting
+ void setCacheRefreshPeriod(long time) {
+ this.cacheRefreshPeriod = time;
+ }
+
static class CacheUpdateMasterWork implements Runnable {
private CachedStore cachedStore;
@@ -265,86 +322,175 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void run() {
- runningMasterThread.set(Thread.currentThread());
- RawStore rawStore = cachedStore.getRawStore();
+ // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+ Deadline.registerIfNot(1000000);
+ LOG.debug("CachedStore: updating cached objects");
+ String rawStoreClassName =
+ HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL,
+ ObjectStore.class.getName());
try {
+ RawStore rawStore =
+ ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)).newInstance();
+ rawStore.setConf(cachedStore.conf);
List<String> dbNames = rawStore.getAllDatabases();
- // Update the database in cache
- if (!updateDatabases(rawStore, dbNames)) {
- return;
- }
- // Update the tables and their partitions in cache
- if (!updateTables(rawStore, dbNames)) {
- return;
+ if (dbNames != null) {
+ // Update the database in cache
+ updateDatabases(rawStore, dbNames);
+ for (String dbName : dbNames) {
+ // Update the tables in cache
+ updateTables(rawStore, dbName);
+ List<String> tblNames = cachedStore.getAllTables(dbName);
+ for (String tblName : tblNames) {
+ // Update the partitions for a table in cache
+ updateTablePartitions(rawStore, dbName, tblName);
+ // Update the table column stats for a table in cache
+ updateTableColStats(rawStore, dbName, tblName);
+ // Update the partitions column stats for a table in cache
+ updateTablePartitionColStats(rawStore, dbName, tblName);
+ }
+ }
}
} catch (MetaException e) {
LOG.error("Updating CachedStore: error getting database names", e);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
}
}
- private boolean updateDatabases(RawStore rawStore, List<String> dbNames) {
- if (dbNames != null) {
- List<Database> databases = new ArrayList<Database>();
- for (String dbName : dbNames) {
- // If a preemption of this thread was requested, simply return before proceeding
- if (Thread.interrupted()) {
- return false;
+ private void updateDatabases(RawStore rawStore, List<String> dbNames) {
+ // Prepare the list of databases
+ List<Database> databases = new ArrayList<Database>();
+ for (String dbName : dbNames) {
+ Database db;
+ try {
+ db = rawStore.getDatabase(dbName);
+ databases.add(db);
+ } catch (NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+ }
+ }
+ // Update the cached database objects
+ try {
+ if (databaseCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping database cache update; the database list we have is dirty.");
+ return;
}
- Database db;
- try {
- db = rawStore.getDatabase(dbName);
- databases.add(db);
- } catch (NoSuchObjectException e) {
- LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+ SharedCache.refreshDatabases(databases);
+ }
+ } finally {
+ if (databaseCacheLock.isWriteLockedByCurrentThread()) {
+ databaseCacheLock.writeLock().unlock();
+ }
+ }
+ }
+
+ // Update the cached table objects
+ private void updateTables(RawStore rawStore, String dbName) {
+ List<Table> tables = new ArrayList<Table>();
+ try {
+ List<String> tblNames = rawStore.getAllTables(dbName);
+ for (String tblName : tblNames) {
+ Table table =
+ rawStore.getTable(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ tables.add(table);
+ }
+ if (tableCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isTableCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table cache update; the table list we have is dirty.");
+ return;
}
+ SharedCache.refreshTables(dbName, tables);
+ }
+ } catch (MetaException e) {
+ LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
+ } finally {
+ if (tableCacheLock.isWriteLockedByCurrentThread()) {
+ tableCacheLock.writeLock().unlock();
}
- // Update the cached database objects
- SharedCache.refreshDatabases(databases);
}
- return true;
}
- private boolean updateTables(RawStore rawStore, List<String> dbNames) {
- if (dbNames != null) {
- List<Table> tables = new ArrayList<Table>();
- for (String dbName : dbNames) {
- try {
- List<String> tblNames = rawStore.getAllTables(dbName);
- for (String tblName : tblNames) {
- // If a preemption of this thread was requested, simply return before proceeding
- if (Thread.interrupted()) {
- return false;
- }
- Table table = rawStore.getTable(dbName, tblName);
- tables.add(table);
- }
- // Update the cached database objects
- SharedCache.refreshTables(dbName, tables);
- for (String tblName : tblNames) {
- // If a preemption of this thread was requested, simply return before proceeding
- if (Thread.interrupted()) {
- return false;
- }
- List<Partition> partitions =
- rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
- SharedCache.refreshPartitions(dbName, tblName, partitions);
- }
- } catch (MetaException | NoSuchObjectException e) {
- LOG.error("Updating CachedStore: unable to read table", e);
- return false;
+ // Update the cached partition objects for a table
+ private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Deadline.startTimer("getPartitions");
+ List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ Deadline.stopTimer();
+ if (partitionCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isPartitionCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
+ return;
}
+ SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partitions);
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+ } finally {
+ if (partitionCacheLock.isWriteLockedByCurrentThread()) {
+ partitionCacheLock.writeLock().unlock();
}
}
- return true;
}
- }
- // Interrupt the cache update background thread
- // Fire and forget (the master will respond appropriately when it gets a chance)
- // All writes to the cache go through synchronized methods, so fire & forget is fine.
- private void interruptCacheUpdateMaster() {
- if (runningMasterThread.get() != null) {
- runningMasterThread.get().interrupt();
+ // Update the cached col stats for this table
+ private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Table table = rawStore.getTable(dbName, tblName);
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ Deadline.startTimer("getTableColumnStatistics");
+ ColumnStatistics tableColStats =
+ rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ if (tableColStatsCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table column stats cache update; the table column stats list we "
+ + "have is dirty.");
+ return;
+ }
+ SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
+ } finally {
+ if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
+ tableColStatsCacheLock.writeLock().unlock();
+ }
+ }
+ }
+
+ // Update the cached partition col stats for a table
+ private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Deadline.startTimer("getColStatsForTablePartitions");
+ Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+ rawStore.getColStatsForTablePartitions(dbName, tblName);
+ Deadline.stopTimer();
+ if (partitionColStatsCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition column stats cache update; the partition column stats "
+ + "list we have is dirty.");
+ return;
+ }
+ SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition);
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read partitions column stats of table: "
+ + tblName, e);
+ } finally {
+ if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
+ partitionColStatsCacheLock.writeLock().unlock();
+ }
+ }
}
}
@@ -374,11 +520,17 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void createDatabase(Database db)
- throws InvalidObjectException, MetaException {
+ public void createDatabase(Database db) throws InvalidObjectException, MetaException {
rawStore.createDatabase(db);
- interruptCacheUpdateMaster();
- SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy());
+ try {
+ // Wait if background cache update is happening
+ databaseCacheLock.readLock().lock();
+ isDatabaseCacheDirty.set(true);
+ SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()),
+ db.deepCopy());
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
}
@Override
@@ -387,26 +539,38 @@ public class CachedStore implements RawStore, Configurable {
if (db == null) {
throw new NoSuchObjectException();
}
- return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+ return db;
}
@Override
public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
boolean succ = rawStore.dropDatabase(dbname);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+ try {
+ // Wait if background cache update is happening
+ databaseCacheLock.readLock().lock();
+ isDatabaseCacheDirty.set(true);
+ SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public boolean alterDatabase(String dbName, Database db)
- throws NoSuchObjectException, MetaException {
+ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
+ MetaException {
boolean succ = rawStore.alterDatabase(dbName, db);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+ try {
+ // Wait if background cache update is happening
+ databaseCacheLock.readLock().lock();
+ isDatabaseCacheDirty.set(true);
+ SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
}
return succ;
}
@@ -462,24 +626,45 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void createTable(Table tbl)
- throws InvalidObjectException, MetaException {
+ public void createTable(Table tbl) throws InvalidObjectException, MetaException {
rawStore.createTable(tbl);
- interruptCacheUpdateMaster();
validateTableType(tbl);
- SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
- HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ try {
+ // Wait if background cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
+ HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
}
@Override
- public boolean dropTable(String dbName, String tableName)
- throws MetaException, NoSuchObjectException, InvalidObjectException,
- InvalidInputException {
+ public boolean dropTable(String dbName, String tableName) throws MetaException,
+ NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropTable(dbName, tableName);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tableName));
+ // Remove table
+ try {
+ // Wait if background table cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName));
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
+ // Remove table col stats
+ try {
+ // Wait if background table col stats cache update is happening
+ tableColStatsCacheLock.readLock().lock();
+ isTableColStatsCacheDirty.set(true);
+ SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName));
+ } finally {
+ tableColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@@ -496,57 +681,74 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public boolean addPartition(Partition part)
- throws InvalidObjectException, MetaException {
+ public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartition(part);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
- HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
+ HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public boolean addPartitions(String dbName, String tblName,
- List<Partition> parts) throws InvalidObjectException, MetaException {
+ public boolean addPartitions(String dbName, String tblName, List<Partition> parts)
+ throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(dbName, tblName, parts);
if (succ) {
- interruptCacheUpdateMaster();
- for (Partition part : parts) {
- SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
- HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ for (Partition part : parts) {
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
}
}
return succ;
}
@Override
- public boolean addPartitions(String dbName, String tblName,
- PartitionSpecProxy partitionSpec, boolean ifNotExists)
- throws InvalidObjectException, MetaException {
+ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec,
+ boolean ifNotExists) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists);
if (succ) {
- interruptCacheUpdateMaster();
- PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
- while (iterator.hasNext()) {
- Partition part = iterator.next();
- SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), part);
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+ while (iterator.hasNext()) {
+ Partition part = iterator.next();
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
}
}
return succ;
}
@Override
- public Partition getPartition(String dbName, String tableName,
- List<String> part_vals) throws MetaException, NoSuchObjectException {
- Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ public Partition getPartition(String dbName, String tableName, List<String> part_vals)
+ throws MetaException, NoSuchObjectException {
+ Partition part =
+ SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
if (part != null) {
part.unsetPrivileges();
} else {
- throw new NoSuchObjectException();
+ throw new NoSuchObjectException("partition values=" + part_vals.toString());
}
return part;
}
@@ -559,14 +761,30 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public boolean dropPartition(String dbName, String tableName,
- List<String> part_vals) throws MetaException, NoSuchObjectException,
- InvalidObjectException, InvalidInputException {
+ public boolean dropPartition(String dbName, String tableName, List<String> part_vals)
+ throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropPartition(dbName, tableName, part_vals);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ // Remove partition
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Remove partition col stats
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@@ -588,10 +806,28 @@ public class CachedStore implements RawStore, Configurable {
public void alterTable(String dbName, String tblName, Table newTable)
throws InvalidObjectException, MetaException {
rawStore.alterTable(dbName, tblName, newTable);
- interruptCacheUpdateMaster();
validateTableType(newTable);
- SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ // Update table cache
+ try {
+ // Wait if background cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
+ // Update partition cache (key might have changed since table name is a
+ // component of key)
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
}
@Override
@@ -685,26 +921,62 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void alterPartition(String dbName, String tblName,
- List<String> partVals, Partition newPart)
+ public void alterPartition(String dbName, String tblName, List<String> partVals, Partition newPart)
throws InvalidObjectException, MetaException {
rawStore.alterPartition(dbName, tblName, partVals, newPart);
- interruptCacheUpdateMaster();
- SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ // Update partition cache
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Update partition column stats cache
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
}
@Override
- public void alterPartitions(String dbName, String tblName,
- List<List<String>> partValsList, List<Partition> newParts)
- throws InvalidObjectException, MetaException {
+ public void alterPartitions(String dbName, String tblName, List<List<String>> partValsList,
+ List<Partition> newParts) throws InvalidObjectException, MetaException {
rawStore.alterPartitions(dbName, tblName, partValsList, newParts);
- interruptCacheUpdateMaster();
- for (int i=0;i<partValsList.size();i++) {
- List<String> partVals = partValsList.get(i);
- Partition newPart = newParts.get(i);
- SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ // Update partition cache
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ for (int i = 0; i < partValsList.size(); i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Update partition column stats cache
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ for (int i = 0; i < partValsList.size(); i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ }
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
}
}
@@ -1095,55 +1367,199 @@ public class CachedStore implements RawStore, Configurable {
@Override
public boolean updateTableColumnStatistics(ColumnStatistics colStats)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
- InvalidInputException {
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.updateTableColumnStatistics(colStats);
if (succ) {
- SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
- HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj());
+ String dbName = colStats.getStatsDesc().getDbName();
+ String tableName = colStats.getStatsDesc().getTableName();
+ List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+ Table tbl = getTable(dbName, tableName);
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj : statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+ StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+
+ // Update table
+ try {
+ // Wait if background cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), tbl);
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
+
+ // Update table col stats
+ try {
+ // Wait if background cache update is happening
+ tableColStatsCacheLock.readLock().lock();
+ isTableColStatsCacheDirty.set(true);
+ SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), statsObjs);
+ } finally {
+ tableColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public boolean updatePartitionColumnStatistics(ColumnStatistics colStats,
- List<String> partVals) throws NoSuchObjectException, MetaException,
- InvalidObjectException, InvalidInputException {
- boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName,
+ List<String> colNames) throws MetaException, NoSuchObjectException {
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+ for (String colName : colNames) {
+ String colStatsCacheKey =
+ CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), colName);
+ ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey);
+ if (colStat != null) {
+ colStatObjs.add(colStat);
+ }
+ }
+ if (colStatObjs.isEmpty()) {
+ return null;
+ } else {
+ return new ColumnStatistics(csd, colStatObjs);
+ }
+ }
+
+ @Override
+ public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName)
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
if (succ) {
- SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
- HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj());
+ try {
+ // Wait if background cache update is happening
+ tableColStatsCacheLock.readLock().lock();
+ isTableColStatsCacheDirty.set(true);
+ SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), colName);
+ } finally {
+ tableColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public ColumnStatistics getTableColumnStatistics(String dbName,
- String tableName, List<String> colName)
- throws MetaException, NoSuchObjectException {
- return rawStore.getTableColumnStatistics(dbName, tableName, colName);
+ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals)
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+ if (succ) {
+ String dbName = colStats.getStatsDesc().getDbName();
+ String tableName = colStats.getStatsDesc().getTableName();
+ List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+ Partition part = getPartition(dbName, tableName, partVals);
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj : statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+ StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+
+ // Update partition
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), partVals, part);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+
+ // Update partition column stats
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.updatePartitionColStatsInCache(
+ HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
+ HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals,
+ colStats.getStatsObj());
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
+ }
+ return succ;
}
@Override
- public List<ColumnStatistics> getPartitionColumnStatistics(String dbName,
- String tblName, List<String> partNames, List<String> colNames)
- throws MetaException, NoSuchObjectException {
+ // TODO: calculate from cached values.
+ // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache.
+ public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, String tblName,
+ List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
}
@Override
- public boolean deletePartitionColumnStatistics(String dbName,
- String tableName, String partName, List<String> partVals, String colName)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
- InvalidInputException {
- return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+ public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName,
+ List<String> partVals, String colName) throws NoSuchObjectException, MetaException,
+ InvalidObjectException, InvalidInputException {
+ boolean succ =
+ rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+ if (succ) {
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), partVals, colName);
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
+ }
+ return succ;
}
@Override
- public boolean deleteTableColumnStatistics(String dbName, String tableName,
- String colName) throws NoSuchObjectException, MetaException,
- InvalidObjectException, InvalidInputException {
- return rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
+ public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames,
+ List<String> colNames) throws MetaException, NoSuchObjectException {
+ List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
+ for (String colName : colNames) {
+ ColumnStatisticsObj colStat =
+ mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partNames, colName);
+ if (colStat == null) {
+ // Stop and fall back to underlying RawStore
+ colStats = null;
+ break;
+ } else {
+ colStats.add(colStat);
+ }
+ }
+ if (colStats == null) {
+ return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ } else {
+ return new AggrStats(colStats, partNames.size());
+ }
+ }
+
+ private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
+ List<String> partNames, String colName) throws MetaException {
+ ColumnStatisticsObj colStats = null;
+ for (String partName : partNames) {
+ String colStatsCacheKey =
+ CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
+ ColumnStatisticsObj colStatsForPart =
+ SharedCache.getCachedPartitionColStats(colStatsCacheKey);
+ if (colStatsForPart == null) {
+ // we don't have stats for all the partitions
+ // logic for extrapolation hasn't been added to CacheStore
+ // So stop now, and lets fallback to underlying RawStore
+ return null;
+ }
+ if (colStats == null) {
+ colStats = colStatsForPart;
+ } else {
+ ColumnStatsMerger merger =
+ ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart);
+ merger.merge(colStats, colStatsForPart);
+ }
+ }
+ return colStats;
}
@Override
@@ -1209,14 +1625,34 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void dropPartitions(String dbName, String tblName,
- List<String> partNames) throws MetaException, NoSuchObjectException {
+ public void dropPartitions(String dbName, String tblName, List<String> partNames)
+ throws MetaException, NoSuchObjectException {
rawStore.dropPartitions(dbName, tblName, partNames);
- interruptCacheUpdateMaster();
- for (String partName : partNames) {
- List<String> vals = partNameToVals(partName);
- SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), vals);
+ // Remove partitions
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ for (String partName : partNames) {
+ List<String> vals = partNameToVals(partName);
+ SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), vals);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Remove partition col stats
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ for (String partName : partNames) {
+ List<String> part_vals = partNameToVals(partName);
+ SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part_vals);
+ }
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
}
}
@@ -1326,130 +1762,6 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public AggrStats get_aggr_stats_for(String dbName, String tblName,
- List<String> partNames, List<String> colNames)
- throws MetaException, NoSuchObjectException {
- List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
- for (String colName : colNames) {
- colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), partNames, colName));
- }
- // TODO: revisit the partitions not found case for extrapolation
- return new AggrStats(colStats, partNames.size());
- }
-
- private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
- List<String> partNames, String colName) throws MetaException {
- ColumnStatisticsObj colStats = null;
- for (String partName : partNames) {
- String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
- ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats(
- colStatsCacheKey);
- if (colStats == null) {
- colStats = colStatsForPart;
- } else {
- colStats = mergeColStatsObj(colStats, colStatsForPart);
- }
- }
- return colStats;
- }
-
- private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1,
- ColumnStatisticsObj colStats2) throws MetaException {
- if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType()))
- && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) {
- throw new MetaException("Can't merge column stats for two partitions for different columns.");
- }
- ColumnStatisticsData csd = new ColumnStatisticsData();
- ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(),
- colStats1.getColType(), csd);
- ColumnStatisticsData csData1 = colStats1.getStatsData();
- ColumnStatisticsData csData2 = colStats2.getStatsData();
- String colType = colStats1.getColType().toLowerCase();
- if (colType.equals("boolean")) {
- BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
- boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses()
- + csData2.getBooleanStats().getNumFalses());
- boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues()
- + csData2.getBooleanStats().getNumTrues());
- boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls()
- + csData2.getBooleanStats().getNumNulls());
- csd.setBooleanStats(boolStats);
- } else if (colType.equals("string") || colType.startsWith("varchar")
- || colType.startsWith("char")) {
- StringColumnStatsData stringStats = new StringColumnStatsData();
- stringStats.setNumNulls(csData1.getStringStats().getNumNulls()
- + csData2.getStringStats().getNumNulls());
- stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2
- .getStringStats().getAvgColLen()));
- stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2
- .getStringStats().getMaxColLen()));
- stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats()
- .getNumDVs()));
- csd.setStringStats(stringStats);
- } else if (colType.equals("binary")) {
- BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
- binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls()
- + csData2.getBinaryStats().getNumNulls());
- binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2
- .getBinaryStats().getAvgColLen()));
- binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2
- .getBinaryStats().getMaxColLen()));
- csd.setBinaryStats(binaryStats);
- } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint")
- || colType.equals("tinyint") || colType.equals("timestamp")) {
- LongColumnStatsData longStats = new LongColumnStatsData();
- longStats.setNumNulls(csData1.getLongStats().getNumNulls()
- + csData2.getLongStats().getNumNulls());
- longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats()
- .getHighValue()));
- longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats()
- .getLowValue()));
- longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats()
- .getNumDVs()));
- csd.setLongStats(longStats);
- } else if (colType.equals("date")) {
- DateColumnStatsData dateStats = new DateColumnStatsData();
- dateStats.setNumNulls(csData1.getDateStats().getNumNulls()
- + csData2.getDateStats().getNumNulls());
- dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue()
- .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch())));
- dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue()
- .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch())));
- dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats()
- .getNumDVs()));
- csd.setDateStats(dateStats);
- } else if (colType.equals("double") || colType.equals("float")) {
- DoubleColumnStatsData doubleStats = new DoubleColumnStatsData();
- doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls()
- + csData2.getDoubleStats().getNumNulls());
- doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2
- .getDoubleStats().getHighValue()));
- doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2
- .getDoubleStats().getLowValue()));
- doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats()
- .getNumDVs()));
- csd.setDoubleStats(doubleStats);
- } else if (colType.startsWith("decimal")) {
- DecimalColumnStatsData decimalStats = new DecimalColumnStatsData();
- decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls()
- + csData2.getDecimalStats().getNumNulls());
- Decimal high = (csData1.getDecimalStats().getHighValue()
- .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats()
- .getHighValue() : csData2.getDecimalStats().getHighValue();
- decimalStats.setHighValue(high);
- Decimal low = (csData1.getDecimalStats().getLowValue()
- .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats()
- .getLowValue() : csData2.getDecimalStats().getLowValue();
- decimalStats.setLowValue(low);
- decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2
- .getDecimalStats().getNumDVs()));
- csd.setDecimalStats(decimalStats);
- }
- return cso;
- }
-
- @Override
public NotificationEventResponse getNextNotification(
NotificationEventRequest rqst) {
return rawStore.getNextNotification(rqst);
@@ -1565,10 +1877,9 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(
- String dbName, String tableName)
- throws MetaException, NoSuchObjectException {
- return rawStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ return rawStore.getColStatsForTablePartitions(dbName, tableName);
}
public RawStore getRawStore() {