You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:52:10 UTC

[36/54] [abbrv] hive git commit: HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 7beee42..6b6355b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,14 +21,18 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 
-import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -38,17 +42,26 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapp
 import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
 import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
 import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
 public class SharedCache {
   private static Map<String, Database> databaseCache = new TreeMap<String, Database>();
   private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>();
-  private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>();
-  private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>();
-  private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
+  private static Map<String, PartitionWrapper> partitionCache =
+      new TreeMap<String, PartitionWrapper>();
+  private static Map<String, ColumnStatisticsObj> partitionColStatsCache =
+      new TreeMap<String, ColumnStatisticsObj>();
+  private static Map<String, ColumnStatisticsObj> tableColStatsCache =
+      new TreeMap<String, ColumnStatisticsObj>();
+  private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache =
+      new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
   private static MessageDigest md;
 
+  static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+
   static {
     try {
       md = MessageDigest.getInstance("MD5");
@@ -97,11 +110,13 @@ public class SharedCache {
     Table tblCopy = tbl.deepCopy();
     tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName));
     tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName));
-    for (FieldSchema fs : tblCopy.getPartitionKeys()) {
-      fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+    if (tblCopy.getPartitionKeys() != null) {
+      for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+        fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+      }
     }
     TableWrapper wrapper;
-    if (tbl.getSd()!=null) {
+    if (tbl.getSd() != null) {
       byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md);
       StorageDescriptor sd = tbl.getSd();
       increSd(sd, sdHash);
@@ -121,10 +136,54 @@ public class SharedCache {
     }
   }
 
+  public static synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
+    return tableColStatsCache.get(colStatsCacheKey);
+  }
+
+  public static synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
+    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        tableColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+        iterator.remove();
+      }
+    }
+  }
+
+  public static synchronized void removeTableColStatsFromCache(String dbName, String tblName,
+      String colName) {
+    tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+  }
+
+  public static synchronized void updateTableColStatsInCache(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+      // Get old stats object if present
+      String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
+      ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+      if (oldStatsObj != null) {
+        LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
+            + ", of table: " + tableName + " and database: " + dbName);
+        // Update existing stat object's field
+        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+      } else {
+        // No stats exist for this key; add a new object to the cache
+        tableColStatsCache.put(key, colStatObj);
+      }
+    }
+  }
+
   public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
     removeTableFromCache(dbName, tblName);
     addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
         HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+  }
+
+  public static synchronized void alterTableInPartitionCache(String dbName, String tblName,
+      Table newTable) {
     if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
       List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
       for (Partition part : partitions) {
@@ -137,6 +196,58 @@ public class SharedCache {
     }
   }
 
+  public static synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
+      Table newTable) {
+    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+      String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+      Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+          tableColStatsCache.entrySet().iterator();
+      Map<String, ColumnStatisticsObj> newTableColStats =
+          new HashMap<String, ColumnStatisticsObj>();
+      while (iterator.hasNext()) {
+        Entry<String, ColumnStatisticsObj> entry = iterator.next();
+        String key = entry.getKey();
+        ColumnStatisticsObj colStatObj = entry.getValue();
+        if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
+          String[] decomposedKey = CacheUtils.splitTableColStats(key);
+          String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
+          newTableColStats.put(newKey, colStatObj);
+          iterator.remove();
+        }
+      }
+      tableColStatsCache.putAll(newTableColStats);
+    }
+  }
+
+  public static synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
+      Table newTable) {
+    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+      Map<String, ColumnStatisticsObj> newPartitionColStats =
+          new HashMap<String, ColumnStatisticsObj>();
+      for (Partition part : partitions) {
+        String oldPartialPartitionKey =
+            CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
+        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+            partitionColStatsCache.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Entry<String, ColumnStatisticsObj> entry = iterator.next();
+          String key = entry.getKey();
+          ColumnStatisticsObj colStatObj = entry.getValue();
+          if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+            Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+            String newKey =
+                CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1],
+                    (List<String>) decomposedKey[2], (String) decomposedKey[3]);
+            newPartitionColStats.put(newKey, colStatObj);
+            iterator.remove();
+          }
+        }
+      }
+      partitionColStatsCache.putAll(newPartitionColStats);
+    }
+  }
+
   public static synchronized int getCachedTableCount() {
     return tableCache.size();
   }
@@ -151,18 +262,6 @@ public class SharedCache {
     return tables;
   }
 
-  public static synchronized void updateTableColumnStatistics(String dbName, String tableName,
-      List<ColumnStatisticsObj> statsObjs) {
-    Table tbl = getTableFromCache(dbName, tableName);
-    tbl.getSd().getParameters();
-    List<String> colNames = new ArrayList<>();
-    for (ColumnStatisticsObj statsObj:statsObjs) {
-      colNames.add(statsObj.getColName());
-    }
-    StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
-    alterTableInCache(dbName, tableName, tbl);
-  }
-
   public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
     List<TableMeta> tableMetas = new ArrayList<TableMeta>();
     for (String dbName : listCachedDatabases()) {
@@ -214,14 +313,51 @@ public class SharedCache {
     return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
   }
 
-  public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) {
-    PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
-    if (wrapper.getSdHash()!=null) {
+  public static synchronized Partition removePartitionFromCache(String dbName, String tblName,
+      List<String> part_vals) {
+    PartitionWrapper wrapper =
+        partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
+    if (wrapper.getSdHash() != null) {
       decrSd(wrapper.getSdHash());
     }
     return wrapper.getPartition();
   }
 
+  // Remove cached column stats for all partitions of a table
+  public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
+    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        partitionColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+        iterator.remove();
+      }
+    }
+  }
+
+  // Remove cached column stats for a particular partition of a table
+  public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
+      List<String> partVals) {
+    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        partitionColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+        iterator.remove();
+      }
+    }
+  }
+
+  // Remove cached column stats for a particular partition and a particular column of a table
+  public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
+      List<String> partVals, String colName) {
+    partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+  }
+
   public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
     List<Partition> partitions = new ArrayList<Partition>();
     int count = 0;
@@ -236,22 +372,53 @@ public class SharedCache {
     return partitions;
   }
 
-  public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) {
+  public static synchronized void alterPartitionInCache(String dbName, String tblName,
+      List<String> partVals, Partition newPart) {
     removePartitionFromCache(dbName, tblName, partVals);
     addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
         HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
   }
 
-  public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName,
-      List<String> partVals, List<ColumnStatisticsObj> statsObjs) {
-    Partition part = getPartitionFromCache(dbName, tableName, partVals);
-    part.getSd().getParameters();
-    List<String> colNames = new ArrayList<>();
-    for (ColumnStatisticsObj statsObj:statsObjs) {
-      colNames.add(statsObj.getColName());
+  public static synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
+      List<String> partVals, Partition newPart) {
+    String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
+    Map<String, ColumnStatisticsObj> newPartitionColStats =
+        new HashMap<String, ColumnStatisticsObj>();
+    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+        partitionColStatsCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, ColumnStatisticsObj> entry = iterator.next();
+      String key = entry.getKey();
+      ColumnStatisticsObj colStatObj = entry.getValue();
+      if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+        Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+        String newKey =
+            CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
+                HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
+                (String) decomposedKey[3]);
+        newPartitionColStats.put(newKey, colStatObj);
+        iterator.remove();
+      }
+    }
+    partitionColStatsCache.putAll(newPartitionColStats);
+  }
+
+  public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+      List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
+    for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+      // Get old stats object if present
+      String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
+      ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+      if (oldStatsObj != null) {
+        // Update existing stat object's field
+        LOG.debug("CachedStore: updating partition column stats for column: "
+            + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
+        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+      } else {
+        // No stats exist for this key; add a new object to the cache
+        partitionColStatsCache.put(key, colStatObj);
+      }
     }
-    StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
-    alterPartitionInCache(dbName, tableName, partVals, part);
   }
 
   public static synchronized int getCachedPartitionCount() {
@@ -262,10 +429,47 @@ public class SharedCache {
     return partitionColStatsCache.get(key);
   }
 
-  public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) {
-    partitionColStatsCache.putAll(aggrStatsPerPartition);
+  public static synchronized void addPartitionColStatsToCache(String dbName, String tableName,
+      Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) {
+    for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) {
+      String partName = entry.getKey();
+      try {
+        List<String> partVals = Warehouse.getPartValuesFromPartName(partName);
+        for (ColumnStatisticsObj colStatObj : entry.getValue()) {
+          String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
+          partitionColStatsCache.put(key, colStatObj);
+        }
+      } catch (MetaException e) {
+        LOG.info("Unable to add partition: " + partName + " to SharedCache", e);
+      }
+    }
   }
 
+  public static synchronized void refreshPartitionColStats(String dbName, String tableName,
+      Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) {
+    LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName
+        + " and table: " + tableName);
+    removePartitionColStatsFromCache(dbName, tableName);
+    addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition);
+  }
+
+  public static synchronized void addTableColStatsToCache(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+      String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
+      tableColStatsCache.put(key, colStatObj);
+    }
+  }
+
+  public static synchronized void refreshTableColStats(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
+        + " and table: " + tableName);
+    // Remove all old cache entries for this table
+    removeTableColStatsFromCache(dbName, tableName);
+    // Add new entries to cache
+    addTableColStatsToCache(dbName, tableName, colStatsForTable);
+  }
 
   public static void increSd(StorageDescriptor sd, byte[] sdHash) {
     ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
@@ -295,6 +499,7 @@ public class SharedCache {
 
   // Replace databases in databaseCache with the new list
   public static synchronized void refreshDatabases(List<Database> databases) {
+    LOG.debug("CachedStore: updating cached database objects");
     for (String dbName : listCachedDatabases()) {
       removeDatabaseFromCache(dbName);
     }
@@ -305,6 +510,7 @@ public class SharedCache {
 
   // Replace tables in tableCache with the new list
   public static synchronized void refreshTables(String dbName, List<Table> tables) {
+    LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
     for (Table tbl : listCachedTables(dbName)) {
       removeTableFromCache(dbName, tbl.getTableName());
     }
@@ -313,17 +519,18 @@ public class SharedCache {
     }
   }
 
-  public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) {
-    List<String> keysToRemove = new ArrayList<String>();
-    for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) {
-      if (entry.getValue().getPartition().getDbName().equals(dbName)
-          && entry.getValue().getPartition().getTableName().equals(tblName)) {
-        keysToRemove.add(entry.getKey());
+  public static synchronized void refreshPartitions(String dbName, String tblName,
+      List<Partition> partitions) {
+    LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
+        + " and table: " + tblName);
+    Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
+    while (iterator.hasNext()) {
+      PartitionWrapper partitionWrapper = iterator.next().getValue();
+      if (partitionWrapper.getPartition().getDbName().equals(dbName)
+          && partitionWrapper.getPartition().getTableName().equals(tblName)) {
+        iterator.remove();
       }
     }
-    for (String key : keysToRemove) {
-      partitionCache.remove(key);
-    }
     for (Partition part : partitions) {
       addPartitionToCache(dbName, tblName, part);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 0c7d8bb..a7681dd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2854,7 +2854,7 @@ public class HBaseStore implements RawStore {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
     // TODO: see if it makes sense to implement this here
     return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
index da6cd46..fe890e4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -35,7 +36,7 @@ public class ColumnStatsMergerFactory {
 
   private ColumnStatsMergerFactory() {
   }
-  
+
   // we depend on the toString() method for javolution.util.FastCollection.
   private static int countNumBitVectors(String s) {
     if (s != null) {
@@ -88,8 +89,15 @@ public class ColumnStatsMergerFactory {
       numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
       break;
     }
+    case DATE_STATS: {
+      agg = new DateColumnStatsMerger();
+      int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDateStats().getBitVectors());
+      int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDateStats().getBitVectors());
+      numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
+      break;
+    }
     default:
-      throw new RuntimeException("Woh, bad.  Unknown stats type " + typeNew.toString());
+      throw new IllegalArgumentException("Unknown stats type " + typeNew.toString());
     }
     if (numBitVectors > 0) {
       agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
@@ -127,8 +135,12 @@ public class ColumnStatsMergerFactory {
       csd.setDecimalStats(new DecimalColumnStatsData());
       break;
 
+    case DATE_STATS:
+      csd.setDateStats(new DateColumnStatsData());
+      break;
+
     default:
-      throw new RuntimeException("Woh, bad.  Unknown stats type!");
+      throw new IllegalArgumentException("Unknown stats type");
     }
 
     cso.setStatsData(csd);

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
new file mode 100644
index 0000000..3179b23
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+
+import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+
+public class DateColumnStatsMerger extends ColumnStatsMerger {
+  @Override
+  public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+    DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats();
+    DateColumnStatsData newData = newColStats.getStatsData().getDateStats();
+    Date lowValue =
+        aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
+            .getLowValue() : newData.getLowValue();
+    aggregateData.setLowValue(lowValue);
+    Date highValue =
+        aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData
+            .getHighValue() : newData.getHighValue();
+    aggregateData.setHighValue(highValue);
+    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+    if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+    } else {
+      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
+          ndvEstimator.getnumBitVectors()));
+      ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+          ndvEstimator.getnumBitVectors()));
+      long ndv = ndvEstimator.estimateNumDistinctValues();
+      LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+          + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+      aggregateData.setNumDVs(ndv);
+      aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index f613c30..f53944f 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -872,7 +872,7 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
     // TODO Auto-generated method stub
     return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 1720e37..e0f5cdb 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -888,7 +888,7 @@ public class DummyRawStoreForJdoConnection implements RawStore {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
     // TODO Auto-generated method stub
     return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index 0ab20d6..7a3ec09 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -25,11 +25,22 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,18 +48,24 @@ import org.junit.Test;
 
 public class TestCachedStore {
 
-  private CachedStore cachedStore = new CachedStore();
+  private ObjectStore objectStore;
+  private CachedStore cachedStore;
 
   @Before
   public void setUp() throws Exception {
     HiveConf conf = new HiveConf();
-    conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName());
-
-    ObjectStore objectStore = new ObjectStore();
+    conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true);
+    conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS,
+        MockPartitionExpressionProxy.class.getName());
+    objectStore = new ObjectStore();
     objectStore.setConf(conf);
+    cachedStore = new CachedStore();
+    cachedStore.setConf(conf);
+    // Stop the CachedStore cache update service. We'll start it explicitly to control the test
+    cachedStore.stopCacheUpdateService(1);
 
-    cachedStore.setRawStore(objectStore);
-
+    // Stop the CachedStore cache update service. We'll start it explicitly to control the test
+    cachedStore.stopCacheUpdateService(1);
     SharedCache.getDatabaseCache().clear();
     SharedCache.getTableCache().clear();
     SharedCache.getPartitionCache().clear();
@@ -56,6 +73,426 @@ public class TestCachedStore {
     SharedCache.getPartitionColStatsCache().clear();
   }
 
+  /**********************************************************************************************
+   * Methods that test CachedStore
+   *********************************************************************************************/
+
+  @Test
+  public void testDatabaseOps() throws Exception {
+    // Add a db via ObjectStore
+    String dbName = "testDatabaseOps";
+    String dbDescription = "testDatabaseOps";
+    String dbLocation = "file:/tmp";
+    Map<String, String> dbParams = new HashMap<String, String>();
+    String dbOwner = "user1";
+    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    objectStore.createDatabase(db);
+    db = objectStore.getDatabase(dbName);
+    // Prewarm CachedStore
+    cachedStore.prewarm();
+
+    // Read database via CachedStore
+    Database dbNew = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbNew);
+
+    // Add another db via CachedStore
+    final String dbName1 = "testDatabaseOps1";
+    final String dbDescription1 = "testDatabaseOps1";
+    Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams);
+    db1.setOwnerName(dbOwner);
+    db1.setOwnerType(PrincipalType.USER);
+    cachedStore.createDatabase(db1);
+    db1 = cachedStore.getDatabase(dbName1);
+
+    // Read db via ObjectStore
+    dbNew = objectStore.getDatabase(dbName1);
+    Assert.assertEquals(db1, dbNew);
+
+    // Alter the db via CachedStore (can only alter owner or parameters)
+    db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    dbOwner = "user2";
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    cachedStore.alterDatabase(dbName, db);
+    db = cachedStore.getDatabase(dbName);
+
+    // Read db via ObjectStore
+    dbNew = objectStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbNew);
+
+    // Add another db via ObjectStore
+    final String dbName2 = "testDatabaseOps2";
+    final String dbDescription2 = "testDatabaseOps2";
+    Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams);
+    db2.setOwnerName(dbOwner);
+    db2.setOwnerType(PrincipalType.USER);
+    objectStore.createDatabase(db2);
+    db2 = objectStore.getDatabase(dbName2);
+
+    // Alter db "testDatabaseOps" via ObjectStore
+    dbOwner = "user1";
+    db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    objectStore.alterDatabase(dbName, db);
+    db = objectStore.getDatabase(dbName);
+
+    // Drop db "testDatabaseOps1" via ObjectStore
+    objectStore.dropDatabase(dbName1);
+
+    // We update twice to accurately detect if cache is dirty or not
+    updateCache(cachedStore, 100, 500, 100);
+    updateCache(cachedStore, 100, 500, 100);
+
+    // Read the newly added db via CachedStore
+    dbNew = cachedStore.getDatabase(dbName2);
+    Assert.assertEquals(db2, dbNew);
+
+    // Read the altered db via CachedStore (altered user from "user2" to "user1")
+    dbNew = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbNew);
+
+    // Try to read the dropped db after cache update
+    try {
+      dbNew = cachedStore.getDatabase(dbName1);
+      Assert.fail("The database: " + dbName1
+          + " should have been removed from the cache after running the update service");
+    } catch (NoSuchObjectException e) {
+      // Expected
+    }
+
+    // Clean up
+    objectStore.dropDatabase(dbName);
+    objectStore.dropDatabase(dbName2);
+  }
+
+  @Test
+  public void testTableOps() throws Exception {
+    // Add a db via ObjectStore
+    String dbName = "testTableOps";
+    String dbDescription = "testTableOps";
+    String dbLocation = "file:/tmp";
+    Map<String, String> dbParams = new HashMap<String, String>();
+    String dbOwner = "user1";
+    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    objectStore.createDatabase(db);
+    db = objectStore.getDatabase(dbName);
+
+    // Add a table via ObjectStore
+    String tblName = "tbl";
+    String tblOwner = "user1";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+    FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(col1);
+    cols.add(col2);
+    Map<String, String> serdeParams = new HashMap<String, String>();
+    Map<String, String> tblParams = new HashMap<String, String>();
+    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<String, String>());
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+            null, serdeParams);
+    sd.setStoredAsSubDirectories(false);
+    Table tbl =
+        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+            null, null, TableType.MANAGED_TABLE.toString());
+    objectStore.createTable(tbl);
+    tbl = objectStore.getTable(dbName, tblName);
+
+    // Prewarm CachedStore
+    cachedStore.prewarm();
+
+    // Read database, table via CachedStore
+    Database dbNew = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbNew);
+    Table tblNew = cachedStore.getTable(dbName, tblName);
+    Assert.assertEquals(tbl, tblNew);
+
+    // Add a new table via CachedStore
+    String tblName1 = "tbl1";
+    Table tbl1 =
+        new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+            null, null, TableType.MANAGED_TABLE.toString());
+    cachedStore.createTable(tbl1);
+    tbl1 = cachedStore.getTable(dbName, tblName1);
+
+    // Read via object store
+    tblNew = objectStore.getTable(dbName, tblName1);
+    Assert.assertEquals(tbl1, tblNew);
+
+    // Add a new table via ObjectStore
+    String tblName2 = "tbl2";
+    Table tbl2 =
+        new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+            null, null, TableType.MANAGED_TABLE.toString());
+    objectStore.createTable(tbl2);
+    tbl2 = objectStore.getTable(dbName, tblName2);
+
+    // Alter table "tbl" via ObjectStore
+    tblOwner = "user2";
+    tbl =
+        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+            null, null, TableType.MANAGED_TABLE.toString());
+    objectStore.alterTable(dbName, tblName, tbl);
+    tbl = objectStore.getTable(dbName, tblName);
+
+    // Drop table "tbl1" via ObjectStore
+    objectStore.dropTable(dbName, tblName1);
+
+    // We update twice to accurately detect if cache is dirty or not
+    updateCache(cachedStore, 100, 500, 100);
+    updateCache(cachedStore, 100, 500, 100);
+
+    // Read "tbl2" via CachedStore
+    tblNew = cachedStore.getTable(dbName, tblName2);
+    Assert.assertEquals(tbl2, tblNew);
+
+    // Read the altered "tbl" via CachedStore
+    tblNew = cachedStore.getTable(dbName, tblName);
+    Assert.assertEquals(tbl, tblNew);
+
+    // Try to read the dropped "tbl1" via CachedStore (should throw exception)
+    tblNew = cachedStore.getTable(dbName, tblName1);
+    Assert.assertNull(tblNew);
+
+    // Should return "tbl" and "tbl2"
+    List<String> tblNames = cachedStore.getTables(dbName, "*");
+    Assert.assertTrue(tblNames.contains(tblName));
+    Assert.assertTrue(!tblNames.contains(tblName1));
+    Assert.assertTrue(tblNames.contains(tblName2));
+
+    // Clean up
+    objectStore.dropTable(dbName, tblName);
+    objectStore.dropTable(dbName, tblName2);
+    objectStore.dropDatabase(dbName);
+  }
+
+  @Test
+  public void testPartitionOps() throws Exception {
+    // Add a db via ObjectStore
+    String dbName = "testPartitionOps";
+    String dbDescription = "testPartitionOps";
+    String dbLocation = "file:/tmp";
+    Map<String, String> dbParams = new HashMap<String, String>();
+    String dbOwner = "user1";
+    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    objectStore.createDatabase(db);
+    db = objectStore.getDatabase(dbName);
+
+    // Add a table via ObjectStore
+    String tblName = "tbl";
+    String tblOwner = "user1";
+    String serdeLocation = "file:/tmp";
+    FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+    FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(col1);
+    cols.add(col2);
+    Map<String, String> serdeParams = new HashMap<String, String>();
+    Map<String, String> tblParams = new HashMap<String, String>();
+    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+            null, serdeParams);
+    FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    ptnCols.add(ptnCol1);
+    Table tbl =
+        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
+            TableType.MANAGED_TABLE.toString());
+    objectStore.createTable(tbl);
+    tbl = objectStore.getTable(dbName, tblName);
+    final String ptnColVal1 = "aaa";
+    Map<String, String> partParams = new HashMap<String, String>();
+    Partition ptn1 =
+        new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams);
+    objectStore.addPartition(ptn1);
+    ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+    final String ptnColVal2 = "bbb";
+    Partition ptn2 =
+        new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams);
+    objectStore.addPartition(ptn2);
+    ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+
+    // Prewarm CachedStore
+    cachedStore.prewarm();
+
+    // Read database, table, partition via CachedStore
+    Database dbNew = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbNew);
+    Table tblNew = cachedStore.getTable(dbName, tblName);
+    Assert.assertEquals(tbl, tblNew);
+    Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+    Assert.assertEquals(ptn1, newPtn1);
+    Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+    Assert.assertEquals(ptn2, newPtn2);
+
+    // Add a new partition via ObjectStore
+    final String ptnColVal3 = "ccc";
+    Partition ptn3 =
+        new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams);
+    objectStore.addPartition(ptn3);
+    ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+
+    // Alter an existing partition ("aaa") via ObjectStore
+    final String ptnColVal1Alt = "aaaAlt";
+    Partition ptn1Atl =
+        new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams);
+    objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl);
+    ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+
+    // Drop an existing partition ("bbb") via ObjectStore
+    objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+
+    // We update twice to accurately detect if cache is dirty or not
+    updateCache(cachedStore, 100, 500, 100);
+    updateCache(cachedStore, 100, 500, 100);
+
+    // Read the newly added partition via CachedStore
+    Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+    Assert.assertEquals(ptn3, newPtn);
+
+    // Read the altered partition via CachedStore
+    newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+    Assert.assertEquals(ptn1Atl, newPtn);
+
+    // Try to read the dropped partition via CachedStore
+    try {
+      newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+      Assert.fail("The partition: " + ptnColVal2
+          + " should have been removed from the cache after running the update service");
+    } catch (NoSuchObjectException e) {
+      // Expected
+    }
+  }
+
+  //@Test
+  public void testTableColStatsOps() throws Exception {
+    // Add a db via ObjectStore
+    String dbName = "testTableColStatsOps";
+    String dbDescription = "testTableColStatsOps";
+    String dbLocation = "file:/tmp";
+    Map<String, String> dbParams = new HashMap<String, String>();
+    String dbOwner = "user1";
+    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    objectStore.createDatabase(db);
+    db = objectStore.getDatabase(dbName);
+
+    // Add a table via ObjectStore
+    final String tblName = "tbl";
+    final String tblOwner = "user1";
+    final String serdeLocation = "file:/tmp";
+    final FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+    // Stats values for col1
+    long col1LowVal = 5;
+    long col1HighVal = 500;
+    long col1Nulls = 10;
+    long col1DV = 20;
+    final  FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+    // Stats values for col2
+    long col2MaxColLen = 100;
+    double col2AvgColLen = 45.5;
+    long col2Nulls = 5;
+    long col2DV = 40;
+    final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column");
+    // Stats values for col3
+    long col3NumTrues = 100;
+    long col3NumFalses = 30;
+    long col3Nulls = 10;
+    final List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(col1);
+    cols.add(col2);
+    cols.add(col3);
+    Map<String, String> serdeParams = new HashMap<String, String>();
+    Map<String, String> tblParams = new HashMap<String, String>();
+    final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd =
+        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+            null, serdeParams);
+    Table tbl =
+        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+            null, null, TableType.MANAGED_TABLE.toString());
+    objectStore.createTable(tbl);
+    tbl = objectStore.getTable(dbName, tblName);
+
+    // Add ColumnStatistics for tbl to metastore DB via ObjectStore
+    ColumnStatistics stats = new ColumnStatistics();
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
+    List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+
+    // Col1
+    ColumnStatisticsData data1 = new ColumnStatisticsData();
+    ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1);
+    LongColumnStatsData longStats = new LongColumnStatsData();
+    longStats.setLowValue(col1LowVal);
+    longStats.setHighValue(col1HighVal);
+    longStats.setNumNulls(col1Nulls);
+    longStats.setNumDVs(col1DV);
+    data1.setLongStats(longStats);
+    colStatObjs.add(col1Stats);
+
+    // Col2
+    ColumnStatisticsData data2 = new ColumnStatisticsData();
+    ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2);
+    StringColumnStatsData stringStats = new StringColumnStatsData();
+    stringStats.setMaxColLen(col2MaxColLen);
+    stringStats.setAvgColLen(col2AvgColLen);
+    stringStats.setNumNulls(col2Nulls);
+    stringStats.setNumDVs(col2DV);
+    data2.setStringStats(stringStats);
+    colStatObjs.add(col2Stats);
+
+    // Col3
+    ColumnStatisticsData data3 = new ColumnStatisticsData();
+    ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3);
+    BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
+    boolStats.setNumTrues(col3NumTrues);
+    boolStats.setNumFalses(col3NumFalses);
+    boolStats.setNumNulls(col3Nulls);
+    data3.setBooleanStats(boolStats);
+    colStatObjs.add(col3Stats);
+
+    stats.setStatsDesc(statsDesc);
+    stats.setStatsObj(colStatObjs);
+
+    // Save to DB
+    objectStore.updateTableColumnStatistics(stats);
+
+    // Prewarm CachedStore
+    cachedStore.prewarm();
+
+    // Read table stats via CachedStore
+    ColumnStatistics newStats =
+        cachedStore.getTableColumnStatistics(dbName, tblName,
+            Arrays.asList(col1.getName(), col2.getName(), col3.getName()));
+    Assert.assertEquals(stats, newStats);
+  }
+
+  private void updateCache(CachedStore cachedStore, long frequency, long sleepTime,
+      long shutdownTimeout) throws InterruptedException {
+    // Set cache refresh period to 100 milliseconds
+    cachedStore.setCacheRefreshPeriod(100);
+    // Start the CachedStore update service
+    cachedStore.startCacheUpdateService();
+    // Sleep for 500 ms so that cache update is complete
+    Thread.sleep(500);
+    // Stop cache update service
+    cachedStore.stopCacheUpdateService(100);
+  }
+
+  /**********************************************************************************************
+   * Methods that test SharedCache
+   *********************************************************************************************/
+
   @Test
   public void testSharedStoreDb() {
     Database db1 = new Database();
@@ -160,6 +597,7 @@ public class TestCachedStore {
     Assert.assertEquals(SharedCache.getSdCache().size(), 2);
   }
 
+
   @Test
   public void testSharedStorePartition() {
     Partition part1 = new Partition();