You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2017/05/22 22:58:00 UTC

[2/2] hive git commit: HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)

HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d85beaa9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d85beaa9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d85beaa9

Branch: refs/heads/master
Commit: d85beaa99ba349d9334d3d96abb6e89c94db8481
Parents: 952fe6e
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon May 22 15:52:58 2017 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon May 22 15:52:58 2017 -0700

----------------------------------------------------------------------
 .../listener/DummyRawStoreFailEvent.java        |   4 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   2 +-
 .../hive/metastore/MetaStoreDirectSql.java      |  73 +-
 .../hadoop/hive/metastore/MetaStoreUtils.java   |  11 +-
 .../hadoop/hive/metastore/ObjectStore.java      |  19 +-
 .../apache/hadoop/hive/metastore/RawStore.java  |   8 +-
 .../hive/metastore/StatObjectConverter.java     | 148 +++
 .../hadoop/hive/metastore/cache/CacheUtils.java |  31 +
 .../hive/metastore/cache/CachedStore.java       | 943 ++++++++++++-------
 .../hive/metastore/cache/SharedCache.java       | 293 +++++-
 .../hadoop/hive/metastore/hbase/HBaseStore.java |   2 +-
 .../stats/merge/ColumnStatsMergerFactory.java   |  18 +-
 .../stats/merge/DateColumnStatsMerger.java      |  55 ++
 .../DummyRawStoreControlledCommit.java          |   2 +-
 .../DummyRawStoreForJdoConnection.java          |   2 +-
 .../hive/metastore/cache/TestCachedStore.java   | 450 ++++++++-
 16 files changed, 1637 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 91a3a38..3dc63bd 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -914,9 +914,9 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
-    return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+    return objectStore.getColStatsForTablePartitions(dbName, tableName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index d296851..111cc11 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -350,7 +350,7 @@ public class QTestUtil {
     if (!useHBaseMetastore) {
       // Plug verifying metastore in for testing DirectSQL.
       conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
-        "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
+          "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
     } else {
       conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName());
       conf.setBoolVar(ConfVars.METASTORE_FASTPATH, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index b96c27e..df73693 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -1208,7 +1208,9 @@ class MetaStoreDirectSql {
       }
     };
     List<Object[]> list = runBatched(colNames, b);
-    if (list.isEmpty()) return null;
+    if (list.isEmpty()) {
+      return null;
+    }
     ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
     ColumnStatistics result = makeColumnStats(list, csd, 0);
     b.closeAllQueries();
@@ -1343,41 +1345,26 @@ class MetaStoreDirectSql {
 
   // Get aggregated column stats for a table per partition for all columns in the partition
   // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
-      String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
-    String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
-        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
-        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
-        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
-        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
-        // The following data is used to compute a partitioned table's NDV based
-        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
-        // accurately derived from partition NDVs, because the domain of column value two partitions
-        // can overlap. If there is no overlap then global NDV is just the sum
-        // of partition NDVs (UpperBound). But if there is some overlay then
-        // global NDV can be anywhere between sum of partition NDVs (no overlap)
-        // and same as one of the partition NDV (domain of column value in all other
-        // partitions is subset of the domain value in one of the partition)
-        // (LowerBound).But under uniform distribution, we can roughly estimate the global
-        // NDV by leveraging the min/max values.
-        // And, we also guarantee that the estimation makes sense by comparing it to the
-        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
-        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
-        + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
-        + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+      String tblName) throws MetaException {
+    String queryText =
+        "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", "
+            + "\"LONG_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\",  "
+            + "\"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", "
+            + "\"NUM_DISTINCTS\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\""
+            + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
+            + " order by \"PARTITION_NAME\"";
     long start = 0;
     long end = 0;
     Query query = null;
     boolean doTrace = LOG.isDebugEnabled();
     Object qResult = null;
-    ForwardQueryResult fqr = null;
     start = doTrace ? System.nanoTime() : 0;
     query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    qResult = executeWithArray(query,
-        prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+    qResult =
+        executeWithArray(query,
+            prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()),
+            queryText);
     if (qResult == null) {
       query.closeAll();
       return Maps.newHashMap();
@@ -1385,13 +1372,31 @@ class MetaStoreDirectSql {
     end = doTrace ? System.nanoTime() : 0;
     timingTrace(doTrace, queryText, start, end);
     List<Object[]> list = ensureList(qResult);
-    Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+    Map<String, List<ColumnStatisticsObj>> partColStatsMap =
+        new HashMap<String, List<ColumnStatisticsObj>>();
+    String partNameCurrent = null;
+    List<ColumnStatisticsObj> partColStatsList = new ArrayList<ColumnStatisticsObj>();
     for (Object[] row : list) {
       String partName = (String) row[0];
-      String colName = (String) row[1];
-      partColStatsMap.put(
-          CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
-          prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+      if (partNameCurrent == null) {
+        // Update the current partition we are working on
+        partNameCurrent = partName;
+        // Create a new list for this new partition
+        partColStatsList = new ArrayList<ColumnStatisticsObj>();
+        // Add the col stat for the current column
+        partColStatsList.add(prepareCSObj(row, 1));
+      } else if (!partNameCurrent.equalsIgnoreCase(partName)) {
+        // Save the previous partition and its col stat list
+        partColStatsMap.put(partNameCurrent, partColStatsList);
+        // Update the current partition we are working on
+        partNameCurrent = partName;
+        // Create a new list for this new partition
+        partColStatsList = new ArrayList<ColumnStatisticsObj>();
+        // Add the col stat for the current column
+        partColStatsList.add(prepareCSObj(row, 1));
+      } else {
+        partColStatsList.add(prepareCSObj(row, 1));
+      }
       Deadline.checkTimeout();
     }
     query.closeAll();

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 870896c..8328428 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1171,6 +1171,15 @@ public class MetaStoreUtils {
     return addCols(getSchemaWithoutCols(sd, tblsd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols());
   }
 
+  public static List<String> getColumnNamesForTable(Table table) {
+    List<String> colNames = new ArrayList<String>();
+    Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator();
+    while (colsIterator.hasNext()) {
+      colNames.add(colsIterator.next().getName());
+    }
+    return colNames;
+  }
+
   public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) {
     // we first take a look if any fieldSchemas contain COMMA
     for (int i = 0; i < fieldSchemas.size(); i++) {
@@ -1180,7 +1189,7 @@ public class MetaStoreUtils {
     }
     return String.valueOf(SerDeUtils.COMMA);
   }
-  
+
   /**
    * Convert FieldSchemas to columnNames.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index b28983f..19becb8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -7173,23 +7173,18 @@ public class ObjectStore implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
-    final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
-        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
-    final double ndvTuner = HiveConf.getFloatVar(getConf(),
-        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
-    return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+    return new GetHelper<Map<String, List<ColumnStatisticsObj>>>(dbName, tableName, true, false) {
       @Override
-      protected Map<String, ColumnStatisticsObj> getSqlResult(
-          GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
-        return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
-            useDensityFunctionForNDVEstimation, ndvTuner);
+      protected Map<String, List<ColumnStatisticsObj>> getSqlResult(
+          GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException {
+        return directSql.getColStatsForTablePartitions(dbName, tblName);
       }
 
       @Override
-      protected Map<String, ColumnStatisticsObj> getJdoResult(
-          GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+      protected Map<String, List<ColumnStatisticsObj>> getJdoResult(
+          GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException,
           NoSuchObjectException {
         // This is fast path for query optimizations, if we can find this info
         // quickly using directSql, do it. No point in failing back to slow path

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index c1af690..964ffb2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -579,14 +579,16 @@ public interface RawStore extends Configurable {
     List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
 
   /**
-   * Get all partition column statistics for a table
+   * Get all partition column statistics for a table in a db
+   *
    * @param dbName
    * @param tableName
-   * @return Map of partition column statistics
+   * @return Map of partition column statistics. Key in the map is partition name. Value is a list
+   *         of column stat object for each column in the partition
    * @throws MetaException
    * @throws NoSuchObjectException
    */
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index fcf6f27..2dc2804 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
 import org.apache.hadoop.hive.metastore.model.MPartition;
 import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
 import org.apache.hadoop.hive.metastore.model.MTable;
@@ -700,4 +701,151 @@ public class StatObjectConverter {
     return new BigDecimal(new BigInteger(d.getUnscaled()), d.getScale()).toString();
   }
 
+  /**
+   * Set field values in oldStatObj from newStatObj
+   * @param oldStatObj
+   * @param newStatObj
+   */
+  public static void setFieldsIntoOldStats(ColumnStatisticsObj oldStatObj,
+      ColumnStatisticsObj newStatObj) {
+    _Fields typeNew = newStatObj.getStatsData().getSetField();
+    _Fields typeOld = oldStatObj.getStatsData().getSetField();
+    typeNew = typeNew == typeOld ? typeNew : null;
+    switch (typeNew) {
+    case BOOLEAN_STATS:
+      BooleanColumnStatsData oldBooleanStatsData = oldStatObj.getStatsData().getBooleanStats();
+      BooleanColumnStatsData newBooleanStatsData = newStatObj.getStatsData().getBooleanStats();
+      if (newBooleanStatsData.isSetNumTrues()) {
+        oldBooleanStatsData.setNumTrues(newBooleanStatsData.getNumTrues());
+      }
+      if (newBooleanStatsData.isSetNumFalses()) {
+        oldBooleanStatsData.setNumFalses(newBooleanStatsData.getNumFalses());
+      }
+      if (newBooleanStatsData.isSetNumNulls()) {
+        oldBooleanStatsData.setNumNulls(newBooleanStatsData.getNumNulls());
+      }
+      if (newBooleanStatsData.isSetBitVectors()) {
+        oldBooleanStatsData.setBitVectors(newBooleanStatsData.getBitVectors());
+      }
+      break;
+    case LONG_STATS: {
+      LongColumnStatsData oldLongStatsData = oldStatObj.getStatsData().getLongStats();
+      LongColumnStatsData newLongStatsData = newStatObj.getStatsData().getLongStats();
+      if (newLongStatsData.isSetHighValue()) {
+        oldLongStatsData.setHighValue(newLongStatsData.getHighValue());
+      }
+      if (newLongStatsData.isSetLowValue()) {
+        oldLongStatsData.setLowValue(newLongStatsData.getLowValue());
+      }
+      if (newLongStatsData.isSetNumNulls()) {
+        oldLongStatsData.setNumNulls(newLongStatsData.getNumNulls());
+      }
+      if (newLongStatsData.isSetNumDVs()) {
+        oldLongStatsData.setNumDVs(newLongStatsData.getNumDVs());
+      }
+      if (newLongStatsData.isSetBitVectors()) {
+        oldLongStatsData.setBitVectors(newLongStatsData.getBitVectors());
+      }
+      break;
+    }
+    case DOUBLE_STATS: {
+      DoubleColumnStatsData oldDoubleStatsData = oldStatObj.getStatsData().getDoubleStats();
+      DoubleColumnStatsData newDoubleStatsData = newStatObj.getStatsData().getDoubleStats();
+      if (newDoubleStatsData.isSetHighValue()) {
+        oldDoubleStatsData.setHighValue(newDoubleStatsData.getHighValue());
+      }
+      if (newDoubleStatsData.isSetLowValue()) {
+        oldDoubleStatsData.setLowValue(newDoubleStatsData.getLowValue());
+      }
+      if (newDoubleStatsData.isSetNumNulls()) {
+        oldDoubleStatsData.setNumNulls(newDoubleStatsData.getNumNulls());
+      }
+      if (newDoubleStatsData.isSetNumDVs()) {
+        oldDoubleStatsData.setNumDVs(newDoubleStatsData.getNumDVs());
+      }
+      if (newDoubleStatsData.isSetBitVectors()) {
+        oldDoubleStatsData.setBitVectors(newDoubleStatsData.getBitVectors());
+      }
+      break;
+    }
+    case STRING_STATS: {
+      StringColumnStatsData oldStringStatsData = oldStatObj.getStatsData().getStringStats();
+      StringColumnStatsData newStringStatsData = newStatObj.getStatsData().getStringStats();
+      if (newStringStatsData.isSetMaxColLen()) {
+        oldStringStatsData.setMaxColLen(newStringStatsData.getMaxColLen());
+      }
+      if (newStringStatsData.isSetAvgColLen()) {
+        oldStringStatsData.setAvgColLen(newStringStatsData.getAvgColLen());
+      }
+      if (newStringStatsData.isSetNumNulls()) {
+        oldStringStatsData.setNumNulls(newStringStatsData.getNumNulls());
+      }
+      if (newStringStatsData.isSetNumDVs()) {
+        oldStringStatsData.setNumDVs(newStringStatsData.getNumDVs());
+      }
+      if (newStringStatsData.isSetBitVectors()) {
+        oldStringStatsData.setBitVectors(newStringStatsData.getBitVectors());
+      }
+      break;
+    }
+    case BINARY_STATS:
+      BinaryColumnStatsData oldBinaryStatsData = oldStatObj.getStatsData().getBinaryStats();
+      BinaryColumnStatsData newBinaryStatsData = newStatObj.getStatsData().getBinaryStats();
+      if (newBinaryStatsData.isSetMaxColLen()) {
+        oldBinaryStatsData.setMaxColLen(newBinaryStatsData.getMaxColLen());
+      }
+      if (newBinaryStatsData.isSetAvgColLen()) {
+        oldBinaryStatsData.setAvgColLen(newBinaryStatsData.getAvgColLen());
+      }
+      if (newBinaryStatsData.isSetNumNulls()) {
+        oldBinaryStatsData.setNumNulls(newBinaryStatsData.getNumNulls());
+      }
+      if (newBinaryStatsData.isSetBitVectors()) {
+        oldBinaryStatsData.setBitVectors(newBinaryStatsData.getBitVectors());
+      }
+      break;
+    case DECIMAL_STATS: {
+      DecimalColumnStatsData oldDecimalStatsData = oldStatObj.getStatsData().getDecimalStats();
+      DecimalColumnStatsData newDecimalStatsData = newStatObj.getStatsData().getDecimalStats();
+      if (newDecimalStatsData.isSetHighValue()) {
+        oldDecimalStatsData.setHighValue(newDecimalStatsData.getHighValue());
+      }
+      if (newDecimalStatsData.isSetLowValue()) {
+        oldDecimalStatsData.setLowValue(newDecimalStatsData.getLowValue());
+      }
+      if (newDecimalStatsData.isSetNumNulls()) {
+        oldDecimalStatsData.setNumNulls(newDecimalStatsData.getNumNulls());
+      }
+      if (newDecimalStatsData.isSetNumDVs()) {
+        oldDecimalStatsData.setNumDVs(newDecimalStatsData.getNumDVs());
+      }
+      if (newDecimalStatsData.isSetBitVectors()) {
+        oldDecimalStatsData.setBitVectors(newDecimalStatsData.getBitVectors());
+      }
+      break;
+    }
+    case DATE_STATS: {
+      DateColumnStatsData oldDateStatsData = oldStatObj.getStatsData().getDateStats();
+      DateColumnStatsData newDateStatsData = newStatObj.getStatsData().getDateStats();
+      if (newDateStatsData.isSetHighValue()) {
+        oldDateStatsData.setHighValue(newDateStatsData.getHighValue());
+      }
+      if (newDateStatsData.isSetLowValue()) {
+        oldDateStatsData.setLowValue(newDateStatsData.getLowValue());
+      }
+      if (newDateStatsData.isSetNumNulls()) {
+        oldDateStatsData.setNumNulls(newDateStatsData.getNumNulls());
+      }
+      if (newDateStatsData.isSetNumDVs()) {
+        oldDateStatsData.setNumDVs(newDateStatsData.getNumDVs());
+      }
+      if (newDateStatsData.isSetBitVectors()) {
+        oldDateStatsData.setBitVectors(newDateStatsData.getBitVectors());
+      }
+      break;
+    }
+    default:
+      throw new IllegalArgumentException("Unknown stats type: " + typeNew.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index 668499b..280655d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -38,6 +38,10 @@ public class CacheUtils {
     return dbName + delimit + tableName;
   }
 
+  public static String buildKeyWithDelimit(String dbName, String tableName) {
+    return buildKey(dbName, tableName) + delimit;
+  }
+
   public static String buildKey(String dbName, String tableName, List<String> partVals) {
     String key = buildKey(dbName, tableName);
     if (partVals == null || partVals.size() == 0) {
@@ -52,11 +56,38 @@ public class CacheUtils {
     return key;
   }
 
+  public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) {
+    return buildKey(dbName, tableName, partVals) + delimit;
+  }
+
   public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
     String key = buildKey(dbName, tableName, partVals);
     return key + delimit + colName;
   }
 
+  public static String buildKey(String dbName, String tableName, String colName) {
+    String key = buildKey(dbName, tableName);
+    return key + delimit + colName;
+  }
+
+  public static String[] splitTableColStats(String key) {
+    return key.split(delimit);
+  }
+
+  public static Object[] splitPartitionColStats(String key) {
+    Object[] result = new Object[4];
+    String[] comps = key.split(delimit);
+    result[0] = comps[0];
+    result[1] = comps[1];
+    List<String> vals = new ArrayList<String>();
+    for (int i=2;i<comps.length-2;i++) {
+      vals.add(comps[i]);
+    }
+    result[2] = vals;
+    result[3] = comps[comps.length-1];
+    return result;
+  }
+
   public static Table assemble(TableWrapper wrapper) {
     Table t = wrapper.getTable().deepCopy();
     if (wrapper.getSdHash()!=null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 1cc838f..78aab91 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -26,12 +26,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Deadline;
 import org.apache.hadoop.hive.metastore.FileMetadataHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -41,18 +44,11 @@ import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -77,13 +72,14 @@ import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMerger;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMergerFactory;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -104,15 +100,25 @@ import com.google.common.annotations.VisibleForTesting;
 // TODO initial load slow?
 // TODO size estimation
 // TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
-// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object
-// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects)
 
 public class CachedStore implements RawStore, Configurable {
   private static ScheduledExecutorService cacheUpdateMaster = null;
-  private static AtomicReference<Thread> runningMasterThread = new AtomicReference<Thread>(null);
+  private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
+      true);
+  private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
   RawStore rawStore;
   Configuration conf;
   private PartitionExpressionProxy expressionProxy = null;
+  // Default value set to 100 milliseconds for test purpose
+  private long cacheRefreshPeriod = 100;
   static boolean firstTime = true;
 
   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
@@ -209,6 +215,8 @@ public class CachedStore implements RawStore, Configurable {
         LOG.info("Prewarming CachedStore");
         prewarm();
         LOG.info("CachedStore initialized");
+        // Start the cache update master-worker threads
+        startCacheUpdateService();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -216,7 +224,10 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  private void prewarm() throws Exception {
+  @VisibleForTesting
+  void prewarm() throws Exception {
+    // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+    Deadline.registerIfNot(1000000);
     List<String> dbNames = rawStore.getAllDatabases();
     for (String dbName : dbNames) {
       Database db = rawStore.getDatabase(dbName);
@@ -226,35 +237,81 @@ public class CachedStore implements RawStore, Configurable {
         Table table = rawStore.getTable(dbName, tblName);
         SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName),
             HiveStringUtils.normalizeIdentifier(tblName), table);
+        Deadline.startTimer("getPartitions");
         List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+        Deadline.stopTimer();
         for (Partition partition : partitions) {
           SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
               HiveStringUtils.normalizeIdentifier(tblName), partition);
         }
-        Map<String, ColumnStatisticsObj> aggrStatsPerPartition = rawStore
-            .getAggrColStatsForTablePartitions(dbName, tblName);
-        SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition);
+        // Cache partition column stats
+        Deadline.startTimer("getColStatsForTablePartitions");
+        Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+            rawStore.getColStatsForTablePartitions(dbName, tblName);
+        Deadline.stopTimer();
+        if (colStatsPerPartition != null) {
+          SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition);
+        }
+        // Cache table column stats
+        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+        Deadline.startTimer("getTableColumnStatistics");
+        ColumnStatistics tableColStats =
+            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+        Deadline.stopTimer();
+        if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
+          SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+        }
       }
     }
-    // Start the cache update master-worker threads
-    startCacheUpdateService();
   }
 
-  private synchronized void startCacheUpdateService() {
+  @VisibleForTesting
+  synchronized void startCacheUpdateService() {
     if (cacheUpdateMaster == null) {
       cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
         public Thread newThread(Runnable r) {
           Thread t = Executors.defaultThreadFactory().newThread(r);
+          t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
           t.setDaemon(true);
           return t;
         }
       });
-      cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf
-          .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
-              TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+      if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+        cacheRefreshPeriod =
+            HiveConf.getTimeVar(conf,
+                HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
+                TimeUnit.MILLISECONDS);
+      }
+      LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms");
+      cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod,
+          cacheRefreshPeriod, TimeUnit.MILLISECONDS);
     }
   }
 
+  @VisibleForTesting
+  synchronized boolean stopCacheUpdateService(long timeout) {
+    boolean tasksStoppedBeforeShutdown = false;
+    if (cacheUpdateMaster != null) {
+      LOG.info("CachedStore: shutting down cache update service");
+      try {
+        tasksStoppedBeforeShutdown =
+            cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
+            + "complete before shutting down. Will make a hard stop now.");
+      }
+      cacheUpdateMaster.shutdownNow();
+      cacheUpdateMaster = null;
+    }
+    return tasksStoppedBeforeShutdown;
+  }
+
+  @VisibleForTesting
+  void setCacheRefreshPeriod(long time) {
+    this.cacheRefreshPeriod = time;
+  }
+
   static class CacheUpdateMasterWork implements Runnable {
 
     private CachedStore cachedStore;
@@ -265,86 +322,175 @@ public class CachedStore implements RawStore, Configurable {
 
     @Override
     public void run() {
-      runningMasterThread.set(Thread.currentThread());
-      RawStore rawStore = cachedStore.getRawStore();
+      // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+      Deadline.registerIfNot(1000000);
+      LOG.debug("CachedStore: updating cached objects");
+      String rawStoreClassName =
+          HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL,
+              ObjectStore.class.getName());
       try {
+        RawStore rawStore =
+            ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)).newInstance();
+        rawStore.setConf(cachedStore.conf);
         List<String> dbNames = rawStore.getAllDatabases();
-        // Update the database in cache
-        if (!updateDatabases(rawStore, dbNames)) {
-          return;
-        }
-        // Update the tables and their partitions in cache
-        if (!updateTables(rawStore, dbNames)) {
-          return;
+        if (dbNames != null) {
+          // Update the database in cache
+          updateDatabases(rawStore, dbNames);
+          for (String dbName : dbNames) {
+            // Update the tables in cache
+            updateTables(rawStore, dbName);
+            List<String> tblNames = cachedStore.getAllTables(dbName);
+            for (String tblName : tblNames) {
+              // Update the partitions for a table in cache
+              updateTablePartitions(rawStore, dbName, tblName);
+              // Update the table column stats for a table in cache
+              updateTableColStats(rawStore, dbName, tblName);
+              // Update the partitions column stats for a table in cache
+              updateTablePartitionColStats(rawStore, dbName, tblName);
+            }
+          }
         }
       } catch (MetaException e) {
         LOG.error("Updating CachedStore: error getting database names", e);
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
       }
     }
 
-    private boolean updateDatabases(RawStore rawStore, List<String> dbNames) {
-      if (dbNames != null) {
-        List<Database> databases = new ArrayList<Database>();
-        for (String dbName : dbNames) {
-          // If a preemption of this thread was requested, simply return before proceeding
-          if (Thread.interrupted()) {
-            return false;
+    private void updateDatabases(RawStore rawStore, List<String> dbNames) {
+      // Prepare the list of databases
+      List<Database> databases = new ArrayList<Database>();
+      for (String dbName : dbNames) {
+        Database db;
+        try {
+          db = rawStore.getDatabase(dbName);
+          databases.add(db);
+        } catch (NoSuchObjectException e) {
+          LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+        }
+      }
+      // Update the cached database objects
+      try {
+        if (databaseCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping database cache update; the database list we have is dirty.");
+            return;
           }
-          Database db;
-          try {
-            db = rawStore.getDatabase(dbName);
-            databases.add(db);
-          } catch (NoSuchObjectException e) {
-            LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+          SharedCache.refreshDatabases(databases);
+        }
+      } finally {
+        if (databaseCacheLock.isWriteLockedByCurrentThread()) {
+          databaseCacheLock.writeLock().unlock();
+        }
+      }
+    }
+
+    // Update the cached table objects
+    private void updateTables(RawStore rawStore, String dbName) {
+      List<Table> tables = new ArrayList<Table>();
+      try {
+        List<String> tblNames = rawStore.getAllTables(dbName);
+        for (String tblName : tblNames) {
+          Table table =
+              rawStore.getTable(HiveStringUtils.normalizeIdentifier(dbName),
+                  HiveStringUtils.normalizeIdentifier(tblName));
+          tables.add(table);
+        }
+        if (tableCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isTableCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping table cache update; the table list we have is dirty.");
+            return;
           }
+          SharedCache.refreshTables(dbName, tables);
+        }
+      } catch (MetaException e) {
+        LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
+      } finally {
+        if (tableCacheLock.isWriteLockedByCurrentThread()) {
+          tableCacheLock.writeLock().unlock();
         }
-        // Update the cached database objects
-        SharedCache.refreshDatabases(databases);
       }
-      return true;
     }
 
-    private boolean updateTables(RawStore rawStore, List<String> dbNames) {
-      if (dbNames != null) {
-        List<Table> tables = new ArrayList<Table>();
-        for (String dbName : dbNames) {
-          try {
-            List<String> tblNames = rawStore.getAllTables(dbName);
-            for (String tblName : tblNames) {
-              // If a preemption of this thread was requested, simply return before proceeding
-              if (Thread.interrupted()) {
-                return false;
-              }
-              Table table = rawStore.getTable(dbName, tblName);
-              tables.add(table);
-            }
-            // Update the cached database objects
-            SharedCache.refreshTables(dbName, tables);
-            for (String tblName : tblNames) {
-              // If a preemption of this thread was requested, simply return before proceeding
-              if (Thread.interrupted()) {
-                return false;
-              }
-              List<Partition> partitions =
-                  rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
-              SharedCache.refreshPartitions(dbName, tblName, partitions);
-            }
-          } catch (MetaException | NoSuchObjectException e) {
-            LOG.error("Updating CachedStore: unable to read table", e);
-            return false;
+    // Update the cached partition objects for a table
+    private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Deadline.startTimer("getPartitions");
+        List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+        Deadline.stopTimer();
+        if (partitionCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isPartitionCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
+            return;
           }
+          SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), partitions);
+        }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+      } finally {
+        if (partitionCacheLock.isWriteLockedByCurrentThread()) {
+          partitionCacheLock.writeLock().unlock();
         }
       }
-      return true;
     }
-  }
 
-  // Interrupt the cache update background thread
-  // Fire and forget (the master will respond appropriately when it gets a chance)
-  // All writes to the cache go through synchronized methods, so fire & forget is fine.
-  private void interruptCacheUpdateMaster() {
-    if (runningMasterThread.get() != null) {
-      runningMasterThread.get().interrupt();
+    // Update the cached col stats for this table
+    private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Table table = rawStore.getTable(dbName, tblName);
+        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+        Deadline.startTimer("getTableColumnStatistics");
+        ColumnStatistics tableColStats =
+            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+        Deadline.stopTimer();
+        if (tableColStatsCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping table column stats cache update; the table column stats list we "
+                + "have is dirty.");
+            return;
+          }
+          SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+        }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
+      } finally {
+        if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
+          tableColStatsCacheLock.writeLock().unlock();
+        }
+      }
+    }
+
+    // Update the cached partition col stats for a table
+    private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Deadline.startTimer("getColStatsForTablePartitions");
+        Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+            rawStore.getColStatsForTablePartitions(dbName, tblName);
+        Deadline.stopTimer();
+        if (partitionColStatsCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition column stats cache update; the partition column stats "
+                + "list we have is dirty.");
+            return;
+          }
+          SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition);
+        }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read partitions column stats of table: "
+            + tblName, e);
+      } finally {
+        if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
+          partitionColStatsCacheLock.writeLock().unlock();
+        }
+      }
     }
   }
 
@@ -374,11 +520,17 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void createDatabase(Database db)
-      throws InvalidObjectException, MetaException {
+  public void createDatabase(Database db) throws InvalidObjectException, MetaException {
     rawStore.createDatabase(db);
-    interruptCacheUpdateMaster();
-    SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy());
+    try {
+      // Wait if background cache update is happening
+      databaseCacheLock.readLock().lock();
+      isDatabaseCacheDirty.set(true);
+      SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()),
+          db.deepCopy());
+    } finally {
+      databaseCacheLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -387,26 +539,38 @@ public class CachedStore implements RawStore, Configurable {
     if (db == null) {
       throw new NoSuchObjectException();
     }
-    return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+    return db;
   }
 
   @Override
   public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.dropDatabase(dbname);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+      try {
+        // Wait if background cache update is happening
+        databaseCacheLock.readLock().lock();
+        isDatabaseCacheDirty.set(true);
+        SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+      } finally {
+        databaseCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public boolean alterDatabase(String dbName, Database db)
-      throws NoSuchObjectException, MetaException {
+  public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
+      MetaException {
     boolean succ = rawStore.alterDatabase(dbName, db);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+      try {
+        // Wait if background cache update is happening
+        databaseCacheLock.readLock().lock();
+        isDatabaseCacheDirty.set(true);
+        SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+      } finally {
+        databaseCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
@@ -462,24 +626,45 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void createTable(Table tbl)
-      throws InvalidObjectException, MetaException {
+  public void createTable(Table tbl) throws InvalidObjectException, MetaException {
     rawStore.createTable(tbl);
-    interruptCacheUpdateMaster();
     validateTableType(tbl);
-    SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
-        HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+    try {
+      // Wait if background cache update is happening
+      tableCacheLock.readLock().lock();
+      isTableCacheDirty.set(true);
+      SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
+          HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+    } finally {
+      tableCacheLock.readLock().unlock();
+    }
   }
 
   @Override
-  public boolean dropTable(String dbName, String tableName)
-      throws MetaException, NoSuchObjectException, InvalidObjectException,
-      InvalidInputException {
+  public boolean dropTable(String dbName, String tableName) throws MetaException,
+      NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropTable(dbName, tableName);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tableName));
+      // Remove table
+      try {
+        // Wait if background table cache update is happening
+        tableCacheLock.readLock().lock();
+        isTableCacheDirty.set(true);
+        SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName));
+      } finally {
+        tableCacheLock.readLock().unlock();
+      }
+      // Remove table col stats
+      try {
+        // Wait if background table col stats cache update is happening
+        tableColStatsCacheLock.readLock().lock();
+        isTableColStatsCacheDirty.set(true);
+        SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName));
+      } finally {
+        tableColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
@@ -496,57 +681,74 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public boolean addPartition(Partition part)
-      throws InvalidObjectException, MetaException {
+  public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartition(part);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
-          HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
+            HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+      } finally {
+        partitionCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public boolean addPartitions(String dbName, String tblName,
-      List<Partition> parts) throws InvalidObjectException, MetaException {
+  public boolean addPartitions(String dbName, String tblName, List<Partition> parts)
+      throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(dbName, tblName, parts);
     if (succ) {
-      interruptCacheUpdateMaster();
-      for (Partition part : parts) {
-        SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
-            HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        for (Partition part : parts) {
+          SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), part);
+        }
+      } finally {
+        partitionCacheLock.readLock().unlock();
       }
     }
     return succ;
   }
 
   @Override
-  public boolean addPartitions(String dbName, String tblName,
-      PartitionSpecProxy partitionSpec, boolean ifNotExists)
-      throws InvalidObjectException, MetaException {
+  public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec,
+      boolean ifNotExists) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists);
     if (succ) {
-      interruptCacheUpdateMaster();
-      PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
-      while (iterator.hasNext()) {
-        Partition part = iterator.next();
-        SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
-            HiveStringUtils.normalizeIdentifier(tblName), part);
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+        while (iterator.hasNext()) {
+          Partition part = iterator.next();
+          SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), part);
+        }
+      } finally {
+        partitionCacheLock.readLock().unlock();
       }
     }
     return succ;
   }
 
   @Override
-  public Partition getPartition(String dbName, String tableName,
-      List<String> part_vals) throws MetaException, NoSuchObjectException {
-    Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-        HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+  public Partition getPartition(String dbName, String tableName, List<String> part_vals)
+      throws MetaException, NoSuchObjectException {
+    Partition part =
+        SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), part_vals);
     if (part != null) {
       part.unsetPrivileges();
     } else {
-      throw new NoSuchObjectException();
+      throw new NoSuchObjectException("partition values=" + part_vals.toString());
     }
     return part;
   }
@@ -559,14 +761,30 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public boolean dropPartition(String dbName, String tableName,
-      List<String> part_vals) throws MetaException, NoSuchObjectException,
-      InvalidObjectException, InvalidInputException {
+  public boolean dropPartition(String dbName, String tableName, List<String> part_vals)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropPartition(dbName, tableName, part_vals);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+      // Remove partition
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+      } finally {
+        partitionCacheLock.readLock().unlock();
+      }
+      // Remove partition col stats
+      try {
+        // Wait if background cache update is happening
+        partitionColStatsCacheLock.readLock().lock();
+        isPartitionColStatsCacheDirty.set(true);
+        SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+      } finally {
+        partitionColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
@@ -588,10 +806,28 @@ public class CachedStore implements RawStore, Configurable {
   public void alterTable(String dbName, String tblName, Table newTable)
       throws InvalidObjectException, MetaException {
     rawStore.alterTable(dbName, tblName, newTable);
-    interruptCacheUpdateMaster();
     validateTableType(newTable);
-    SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
-        HiveStringUtils.normalizeIdentifier(tblName), newTable);
+    // Update table cache
+    try {
+      // Wait if background cache update is happening
+      tableCacheLock.readLock().lock();
+      isTableCacheDirty.set(true);
+      SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), newTable);
+    } finally {
+      tableCacheLock.readLock().unlock();
+    }
+    // Update partition cache (key might have changed since table name is a
+    // component of key)
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), newTable);
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -685,26 +921,62 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void alterPartition(String dbName, String tblName,
-      List<String> partVals, Partition newPart)
+  public void alterPartition(String dbName, String tblName, List<String> partVals, Partition newPart)
       throws InvalidObjectException, MetaException {
     rawStore.alterPartition(dbName, tblName, partVals, newPart);
-    interruptCacheUpdateMaster();
-    SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
-        HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    // Update partition cache
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
+    // Update partition column stats cache
+    try {
+      // Wait if background cache update is happening
+      partitionColStatsCacheLock.readLock().lock();
+      isPartitionColStatsCacheDirty.set(true);
+      SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    } finally {
+      partitionColStatsCacheLock.readLock().unlock();
+    }
   }
 
   @Override
-  public void alterPartitions(String dbName, String tblName,
-      List<List<String>> partValsList, List<Partition> newParts)
-      throws InvalidObjectException, MetaException {
+  public void alterPartitions(String dbName, String tblName, List<List<String>> partValsList,
+      List<Partition> newParts) throws InvalidObjectException, MetaException {
     rawStore.alterPartitions(dbName, tblName, partValsList, newParts);
-    interruptCacheUpdateMaster();
-    for (int i=0;i<partValsList.size();i++) {
-      List<String> partVals = partValsList.get(i);
-      Partition newPart = newParts.get(i);
-      SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    // Update partition cache
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      for (int i = 0; i < partValsList.size(); i++) {
+        List<String> partVals = partValsList.get(i);
+        Partition newPart = newParts.get(i);
+        SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+      }
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
+    // Update partition column stats cache
+    try {
+      // Wait if background cache update is happening
+      partitionColStatsCacheLock.readLock().lock();
+      isPartitionColStatsCacheDirty.set(true);
+      for (int i = 0; i < partValsList.size(); i++) {
+        List<String> partVals = partValsList.get(i);
+        Partition newPart = newParts.get(i);
+        SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+      }
+    } finally {
+      partitionColStatsCacheLock.readLock().unlock();
     }
   }
 
@@ -1095,55 +1367,199 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public boolean updateTableColumnStatistics(ColumnStatistics colStats)
-      throws NoSuchObjectException, MetaException, InvalidObjectException,
-      InvalidInputException {
+      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.updateTableColumnStatistics(colStats);
     if (succ) {
-      SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
-          HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj());
+      String dbName = colStats.getStatsDesc().getDbName();
+      String tableName = colStats.getStatsDesc().getTableName();
+      List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+      Table tbl = getTable(dbName, tableName);
+      List<String> colNames = new ArrayList<>();
+      for (ColumnStatisticsObj statsObj : statsObjs) {
+        colNames.add(statsObj.getColName());
+      }
+      StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+
+      // Update table
+      try {
+        // Wait if background cache update is happening
+        tableCacheLock.readLock().lock();
+        isTableCacheDirty.set(true);
+        SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), tbl);
+      } finally {
+        tableCacheLock.readLock().unlock();
+      }
+
+      // Update table col stats
+      try {
+        // Wait if background cache update is happening
+        tableColStatsCacheLock.readLock().lock();
+        isTableColStatsCacheDirty.set(true);
+        SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), statsObjs);
+      } finally {
+        tableColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public boolean updatePartitionColumnStatistics(ColumnStatistics colStats,
-      List<String> partVals) throws NoSuchObjectException, MetaException,
-      InvalidObjectException, InvalidInputException {
-    boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+  public ColumnStatistics getTableColumnStatistics(String dbName, String tableName,
+      List<String> colNames) throws MetaException, NoSuchObjectException {
+    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
+    List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+    for (String colName : colNames) {
+      String colStatsCacheKey =
+          CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tableName), colName);
+      ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey);
+      if (colStat != null) {
+        colStatObjs.add(colStat);
+      }
+    }
+    if (colStatObjs.isEmpty()) {
+      return null;
+    } else {
+      return new ColumnStatistics(csd, colStatObjs);
+    }
+  }
+
+  @Override
+  public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+    boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
     if (succ) {
-      SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
-          HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj());
+      try {
+        // Wait if background cache update is happening
+        tableColStatsCacheLock.readLock().lock();
+        isTableColStatsCacheDirty.set(true);
+        SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), colName);
+      } finally {
+        tableColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public ColumnStatistics getTableColumnStatistics(String dbName,
-      String tableName, List<String> colName)
-      throws MetaException, NoSuchObjectException {
-    return rawStore.getTableColumnStatistics(dbName, tableName, colName);
+  public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+    boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+    if (succ) {
+      String dbName = colStats.getStatsDesc().getDbName();
+      String tableName = colStats.getStatsDesc().getTableName();
+      List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+      Partition part = getPartition(dbName, tableName, partVals);
+      List<String> colNames = new ArrayList<>();
+      for (ColumnStatisticsObj statsObj : statsObjs) {
+        colNames.add(statsObj.getColName());
+      }
+      StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+
+      // Update partition
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), partVals, part);
+      } finally {
+        partitionCacheLock.readLock().unlock();
+      }
+
+      // Update partition column stats
+      try {
+        // Wait if background cache update is happening
+        partitionColStatsCacheLock.readLock().lock();
+        isPartitionColStatsCacheDirty.set(true);
+        SharedCache.updatePartitionColStatsInCache(
+            HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
+            HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals,
+            colStats.getStatsObj());
+      } finally {
+        partitionColStatsCacheLock.readLock().unlock();
+      }
+    }
+    return succ;
   }
 
   @Override
-  public List<ColumnStatistics> getPartitionColumnStatistics(String dbName,
-      String tblName, List<String> partNames, List<String> colNames)
-      throws MetaException, NoSuchObjectException {
+  // TODO: calculate from cached values.
+  // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache.
+  public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, String tblName,
+      List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
     return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
   }
 
   @Override
-  public boolean deletePartitionColumnStatistics(String dbName,
-      String tableName, String partName, List<String> partVals, String colName)
-      throws NoSuchObjectException, MetaException, InvalidObjectException,
-      InvalidInputException {
-    return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+  public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName,
+      List<String> partVals, String colName) throws NoSuchObjectException, MetaException,
+      InvalidObjectException, InvalidInputException {
+    boolean succ =
+        rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+    if (succ) {
+      try {
+        // Wait if background cache update is happening
+        partitionColStatsCacheLock.readLock().lock();
+        isPartitionColStatsCacheDirty.set(true);
+        SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), partVals, colName);
+      } finally {
+        partitionColStatsCacheLock.readLock().unlock();
+      }
+    }
+    return succ;
   }
 
   @Override
-  public boolean deleteTableColumnStatistics(String dbName, String tableName,
-      String colName) throws NoSuchObjectException, MetaException,
-      InvalidObjectException, InvalidInputException {
-    return rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
+  public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames,
+      List<String> colNames) throws MetaException, NoSuchObjectException {
+    List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
+    for (String colName : colNames) {
+      ColumnStatisticsObj colStat =
+          mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), partNames, colName);
+      if (colStat == null) {
+        // Stop and fall back to underlying RawStore
+        colStats = null;
+        break;
+      } else {
+        colStats.add(colStat);
+      }
+    }
+    if (colStats == null) {
+      return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+    } else {
+      return new AggrStats(colStats, partNames.size());
+    }
+  }
+
+  private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
+      List<String> partNames, String colName) throws MetaException {
+    ColumnStatisticsObj colStats = null;
+    for (String partName : partNames) {
+      String colStatsCacheKey =
+          CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
+      ColumnStatisticsObj colStatsForPart =
+          SharedCache.getCachedPartitionColStats(colStatsCacheKey);
+      if (colStatsForPart == null) {
+        // we don't have stats for all the partitions
+        // logic for extrapolation hasn't been added to CacheStore
+        // So stop now, and lets fallback to underlying RawStore
+        return null;
+      }
+      if (colStats == null) {
+        colStats = colStatsForPart;
+      } else {
+        ColumnStatsMerger merger =
+            ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart);
+        merger.merge(colStats, colStatsForPart);
+      }
+    }
+    return colStats;
   }
 
   @Override
@@ -1209,14 +1625,34 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void dropPartitions(String dbName, String tblName,
-      List<String> partNames) throws MetaException, NoSuchObjectException {
+  public void dropPartitions(String dbName, String tblName, List<String> partNames)
+      throws MetaException, NoSuchObjectException {
     rawStore.dropPartitions(dbName, tblName, partNames);
-    interruptCacheUpdateMaster();
-    for (String partName : partNames) {
-      List<String> vals = partNameToVals(partName);
-      SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tblName), vals);
+    // Remove partitions
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      for (String partName : partNames) {
+        List<String> vals = partNameToVals(partName);
+        SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), vals);
+      }
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
+    // Remove partition col stats
+    try {
+      // Wait if background cache update is happening
+      partitionColStatsCacheLock.readLock().lock();
+      isPartitionColStatsCacheDirty.set(true);
+      for (String partName : partNames) {
+        List<String> part_vals = partNameToVals(partName);
+        SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), part_vals);
+      }
+    } finally {
+      partitionColStatsCacheLock.readLock().unlock();
     }
   }
 
@@ -1326,130 +1762,6 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public AggrStats get_aggr_stats_for(String dbName, String tblName,
-      List<String> partNames, List<String> colNames)
-      throws MetaException, NoSuchObjectException {
-    List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
-    for (String colName : colNames) {
-      colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tblName), partNames, colName));
-    }
-    // TODO: revisit the partitions not found case for extrapolation
-    return new AggrStats(colStats, partNames.size());
-  }
-
-  private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
-      List<String> partNames, String colName) throws MetaException {
-    ColumnStatisticsObj colStats = null;
-    for (String partName : partNames) {
-      String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
-      ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats(
-          colStatsCacheKey);
-      if (colStats == null) {
-        colStats = colStatsForPart;
-      } else {
-        colStats = mergeColStatsObj(colStats, colStatsForPart);
-      }
-    }
-    return colStats;
-  }
-
-  private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1,
-      ColumnStatisticsObj colStats2) throws MetaException {
-    if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType()))
-        && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) {
-      throw new MetaException("Can't merge column stats for two partitions for different columns.");
-    }
-    ColumnStatisticsData csd = new ColumnStatisticsData();
-    ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(),
-        colStats1.getColType(), csd);
-    ColumnStatisticsData csData1 = colStats1.getStatsData();
-    ColumnStatisticsData csData2 = colStats2.getStatsData();
-    String colType = colStats1.getColType().toLowerCase();
-    if (colType.equals("boolean")) {
-      BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
-      boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses()
-          + csData2.getBooleanStats().getNumFalses());
-      boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues()
-          + csData2.getBooleanStats().getNumTrues());
-      boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls()
-          + csData2.getBooleanStats().getNumNulls());
-      csd.setBooleanStats(boolStats);
-    } else if (colType.equals("string") || colType.startsWith("varchar")
-        || colType.startsWith("char")) {
-      StringColumnStatsData stringStats = new StringColumnStatsData();
-      stringStats.setNumNulls(csData1.getStringStats().getNumNulls()
-          + csData2.getStringStats().getNumNulls());
-      stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2
-          .getStringStats().getAvgColLen()));
-      stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2
-          .getStringStats().getMaxColLen()));
-      stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats()
-          .getNumDVs()));
-      csd.setStringStats(stringStats);
-    } else if (colType.equals("binary")) {
-      BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
-      binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls()
-          + csData2.getBinaryStats().getNumNulls());
-      binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2
-          .getBinaryStats().getAvgColLen()));
-      binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2
-          .getBinaryStats().getMaxColLen()));
-      csd.setBinaryStats(binaryStats);
-    } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint")
-        || colType.equals("tinyint") || colType.equals("timestamp")) {
-      LongColumnStatsData longStats = new LongColumnStatsData();
-      longStats.setNumNulls(csData1.getLongStats().getNumNulls()
-          + csData2.getLongStats().getNumNulls());
-      longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats()
-          .getHighValue()));
-      longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats()
-          .getLowValue()));
-      longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats()
-          .getNumDVs()));
-      csd.setLongStats(longStats);
-    } else if (colType.equals("date")) {
-      DateColumnStatsData dateStats = new DateColumnStatsData();
-      dateStats.setNumNulls(csData1.getDateStats().getNumNulls()
-          + csData2.getDateStats().getNumNulls());
-      dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue()
-          .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch())));
-      dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue()
-          .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch())));
-      dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats()
-          .getNumDVs()));
-      csd.setDateStats(dateStats);
-    } else if (colType.equals("double") || colType.equals("float")) {
-      DoubleColumnStatsData doubleStats = new DoubleColumnStatsData();
-      doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls()
-          + csData2.getDoubleStats().getNumNulls());
-      doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2
-          .getDoubleStats().getHighValue()));
-      doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2
-          .getDoubleStats().getLowValue()));
-      doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats()
-          .getNumDVs()));
-      csd.setDoubleStats(doubleStats);
-    } else if (colType.startsWith("decimal")) {
-      DecimalColumnStatsData decimalStats = new DecimalColumnStatsData();
-      decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls()
-          + csData2.getDecimalStats().getNumNulls());
-      Decimal high = (csData1.getDecimalStats().getHighValue()
-          .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats()
-          .getHighValue() : csData2.getDecimalStats().getHighValue();
-      decimalStats.setHighValue(high);
-      Decimal low = (csData1.getDecimalStats().getLowValue()
-          .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats()
-          .getLowValue() : csData2.getDecimalStats().getLowValue();
-      decimalStats.setLowValue(low);
-      decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2
-          .getDecimalStats().getNumDVs()));
-      csd.setDecimalStats(decimalStats);
-    }
-    return cso;
-  }
-
-  @Override
   public NotificationEventResponse getNextNotification(
       NotificationEventRequest rqst) {
     return rawStore.getNextNotification(rqst);
@@ -1565,10 +1877,9 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(
-      String dbName, String tableName)
-      throws MetaException, NoSuchObjectException {
-    return rawStore.getAggrColStatsForTablePartitions(dbName, tableName);
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException {
+    return rawStore.getColStatsForTablePartitions(dbName, tableName);
   }
 
   public RawStore getRawStore() {