You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2018/12/18 08:18:09 UTC

[2/3] hive git commit: HIVE-18661 : CachedStore: Use metastore notification log events to update cache. (Mahesh Kumar Behera, reviewed by Daniel Dai)

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index cb899d7..9e4f3c2 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1135,6 +1135,7 @@ struct CreationMetadata {
 struct NotificationEventRequest {
     1: required i64 lastEvent,
     2: optional i32 maxEvents,
+    3: optional list<string> eventTypeSkipList,
 }
 
 struct NotificationEvent {

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 0ea46f8..617c7bc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -274,7 +274,7 @@ public class HiveAlterHandler implements AlterHandler {
             part.setDbName(newDbName);
             part.setTableName(newTblName);
             ColumnStatistics colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name,
-                part.getValues(), part.getSd().getCols(), oldt, part, null);
+                part.getValues(), part.getSd().getCols(), oldt, part, null, null);
             if (colStats != null) {
               columnStatsNeedUpdated.put(part, colStats);
             }
@@ -312,7 +312,7 @@ public class HiveAlterHandler implements AlterHandler {
           }
         } else {
           alterTableUpdateTableColumnStats(
-              msdb, oldt, newt, environmentContext, writeIdList);
+              msdb, oldt, newt, environmentContext, writeIdList, conf, null);
         }
       } else {
         // operations other than table rename
@@ -332,7 +332,7 @@ public class HiveAlterHandler implements AlterHandler {
               List<FieldSchema> oldCols = part.getSd().getCols();
               part.getSd().setCols(newt.getSd().getCols());
               ColumnStatistics colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name,
-                  part.getValues(), oldCols, oldt, part, null);
+                  part.getValues(), oldCols, oldt, part, null, null);
               assert(colStats == null);
               if (cascade) {
                 msdb.alterPartition(
@@ -349,11 +349,11 @@ public class HiveAlterHandler implements AlterHandler {
           } else {
             LOG.warn("Alter table not cascaded to partitions.");
             alterTableUpdateTableColumnStats(
-                msdb, oldt, newt, environmentContext, writeIdList);
+                msdb, oldt, newt, environmentContext, writeIdList, conf, null);
           }
         } else {
           alterTableUpdateTableColumnStats(
-              msdb, oldt, newt, environmentContext, writeIdList);
+              msdb, oldt, newt, environmentContext, writeIdList, conf, null);
         }
       }
 
@@ -481,7 +481,7 @@ public class HiveAlterHandler implements AlterHandler {
         // PartitionView does not have SD. We do not need update its column stats
         if (oldPart.getSd() != null) {
           updateOrGetPartitionColumnStats(msdb, catName, dbname, name, new_part.getValues(),
-              oldPart.getSd().getCols(), tbl, new_part, null);
+              oldPart.getSd().getCols(), tbl, new_part, null, null);
         }
         msdb.alterPartition(
             catName, dbname, name, new_part.getValues(), new_part, validWriteIds);
@@ -620,7 +620,7 @@ public class HiveAlterHandler implements AlterHandler {
 
       String newPartName = Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues());
       ColumnStatistics cs = updateOrGetPartitionColumnStats(msdb, catName, dbname, name, oldPart.getValues(),
-          oldPart.getSd().getCols(), tbl, new_part, null);
+          oldPart.getSd().getCols(), tbl, new_part, null, null);
       msdb.alterPartition(catName, dbname, name, part_vals, new_part, validWriteIds);
       if (cs != null) {
         cs.getStatsDesc().setPartName(newPartName);
@@ -727,7 +727,7 @@ public class HiveAlterHandler implements AlterHandler {
         // PartitionView does not have SD and we do not need to update its column stats
         if (oldTmpPart.getSd() != null) {
           updateOrGetPartitionColumnStats(msdb, catName, dbname, name, oldTmpPart.getValues(),
-              oldTmpPart.getSd().getCols(), tbl, tmpPart, null);
+              oldTmpPart.getSd().getCols(), tbl, tmpPart, null, null);
         }
       }
 
@@ -799,8 +799,8 @@ public class HiveAlterHandler implements AlterHandler {
   }
 
   @VisibleForTesting
-  void alterTableUpdateTableColumnStats(RawStore msdb, Table oldTable, Table newTable,
-      EnvironmentContext ec, String validWriteIds)
+  public static List<ColumnStatisticsObj> alterTableUpdateTableColumnStats(RawStore msdb, Table oldTable, Table newTable,
+      EnvironmentContext ec, String validWriteIds, Configuration conf, List<String> deletedCols)
       throws MetaException, InvalidObjectException {
     String catName = normalizeIdentifier(oldTable.isSetCatName() ? oldTable.getCatName() :
         getDefaultCatalog(conf));
@@ -808,11 +808,13 @@ public class HiveAlterHandler implements AlterHandler {
     String tableName = normalizeIdentifier(oldTable.getTableName());
     String newDbName = newTable.getDbName().toLowerCase();
     String newTableName = normalizeIdentifier(newTable.getTableName());
+    List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>();
+    //if its not called from cahced store then update the table
+    boolean doAlterTable = deletedCols == null;
 
     try {
       List<FieldSchema> oldCols = oldTable.getSd().getCols();
       List<FieldSchema> newCols = newTable.getSd().getCols();
-      List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>();
       ColumnStatistics colStats = null;
       boolean updateColumnStats = !newDbName.equals(dbName) || !newTableName.equals(tableName)
           || !MetaStoreServerUtils.columnsIncludedByNameType(oldCols, newCols);
@@ -834,7 +836,10 @@ public class HiveAlterHandler implements AlterHandler {
         } else {
           List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
           if (statsObjs != null) {
-            List<String> deletedCols = new ArrayList<>();
+            // for out para, this value is initialized by caller.
+            if (deletedCols == null) {
+              deletedCols = new ArrayList<>();
+            }
             for (ColumnStatisticsObj statsObj : statsObjs) {
               boolean found = false;
               for (FieldSchema newCol : newCols) {
@@ -847,28 +852,36 @@ public class HiveAlterHandler implements AlterHandler {
 
               if (found) {
                 if (!newDbName.equals(dbName) || !newTableName.equals(tableName)) {
-                  msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+                  if (doAlterTable) {
+                    msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+                  }
                   newStatsObjs.add(statsObj);
                   deletedCols.add(statsObj.getColName());
                 }
               } else {
-                msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+                if (doAlterTable) {
+                  msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+                }
                 deletedCols.add(statsObj.getColName());
               }
             }
-            StatsSetupConst.removeColumnStatsState(newTable.getParameters(), deletedCols);
+            if (doAlterTable) {
+              StatsSetupConst.removeColumnStatsState(newTable.getParameters(), deletedCols);
+            }
           }
         }
       }
 
-      // Change to new table and append stats for the new table
-      msdb.alterTable(catName, dbName, tableName, newTable, validWriteIds);
-      if (updateColumnStats && !newStatsObjs.isEmpty()) {
-        ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
-        statsDesc.setDbName(newDbName);
-        statsDesc.setTableName(newTableName);
-        colStats.setStatsObj(newStatsObjs);
-        msdb.updateTableColumnStatistics(colStats, validWriteIds, newTable.getWriteId());
+      if (doAlterTable) {
+        // Change to new table and append stats for the new table
+        msdb.alterTable(catName, dbName, tableName, newTable, validWriteIds);
+        if (updateColumnStats && !newStatsObjs.isEmpty()) {
+          ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
+          statsDesc.setDbName(newDbName);
+          statsDesc.setTableName(newTableName);
+          colStats.setStatsObj(newStatsObjs);
+          msdb.updateTableColumnStatistics(colStats, validWriteIds, newTable.getWriteId());
+        }
       }
     } catch (NoSuchObjectException nsoe) {
       LOG.debug("Could not find db entry." + nsoe);
@@ -876,13 +889,15 @@ public class HiveAlterHandler implements AlterHandler {
       //should not happen since the input were verified before passed in
       throw new InvalidObjectException("Invalid inputs to update table column stats: " + e);
     }
+    return newStatsObjs;
   }
 
-  private ColumnStatistics updateOrGetPartitionColumnStats(
+  public static ColumnStatistics updateOrGetPartitionColumnStats(
       RawStore msdb, String catName, String dbname, String tblname, List<String> partVals,
-      List<FieldSchema> oldCols, Table table, Partition part, List<FieldSchema> newCols)
+      List<FieldSchema> oldCols, Table table, Partition part, List<FieldSchema> newCols, List<String> deletedCols)
           throws MetaException, InvalidObjectException {
     ColumnStatistics newPartsColStats = null;
+    boolean updateColumnStats = true;
     try {
       // if newCols are not specified, use default ones.
       if (newCols == null) {
@@ -906,10 +921,17 @@ public class HiveAlterHandler implements AlterHandler {
       List<ColumnStatistics> partsColStats = msdb.getPartitionColumnStatistics(catName, dbname, tblname,
           oldPartNames, oldColNames);
       assert (partsColStats.size() <= 1);
+
+      // for out para, this value is initialized by caller.
+      if (deletedCols == null) {
+        deletedCols = new ArrayList<>();
+      } else {
+        // in case deletedCols is provided by caller, stats will be updated  by caller.
+        updateColumnStats = false;
+      }
       for (ColumnStatistics partColStats : partsColStats) { //actually only at most one loop
         List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>();
         List<ColumnStatisticsObj> statsObjs = partColStats.getStatsObj();
-        List<String> deletedCols = new ArrayList<>();
         for (ColumnStatisticsObj statsObj : statsObjs) {
           boolean found =false;
           for (FieldSchema newCol : newCols) {
@@ -921,17 +943,25 @@ public class HiveAlterHandler implements AlterHandler {
           }
           if (found) {
             if (rename) {
-              msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(),
-                  partVals, statsObj.getColName());
+              if (updateColumnStats) {
+                msdb.deletePartitionColumnStatistics(catName, dbname, tblname,
+                        partColStats.getStatsDesc().getPartName(), partVals, statsObj.getColName());
+              } else {
+                deletedCols.add(statsObj.getColName());
+              }
               newStatsObjs.add(statsObj);
             }
           } else {
-            msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(),
-                partVals, statsObj.getColName());
+            if (updateColumnStats) {
+              msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(),
+                      partVals, statsObj.getColName());
+            }
             deletedCols.add(statsObj.getColName());
           }
         }
-        StatsSetupConst.removeColumnStatsState(part.getParameters(), deletedCols);
+        if (updateColumnStats) {
+          StatsSetupConst.removeColumnStatsState(part.getParameters(), deletedCols);
+        }
         if (!newStatsObjs.isEmpty()) {
           partColStats.setStatsObj(newStatsObjs);
           newPartsColStats = partColStats;

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 598847d..0a1b96d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
 import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.PreAlterCatalogEvent;
 import org.apache.hadoop.hive.metastore.events.PreAlterDatabaseEvent;
@@ -140,6 +141,9 @@ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadISchemaEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadhSchemaVersionEvent;
+import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
@@ -568,6 +572,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         listeners.add(new HMSMetricsListener(conf));
       }
 
+      boolean canCachedStoreCanUseEvent = false;
+      for (MetaStoreEventListener listener : transactionalListeners) {
+        if (listener.doesAddEventsToNotificationLogTable()) {
+          canCachedStoreCanUseEvent = true;
+          break;
+        }
+      }
+      if (conf.getBoolean(ConfVars.METASTORE_CACHE_CAN_USE_EVENT.getVarname(), false) &&
+              !canCachedStoreCanUseEvent) {
+        throw new MetaException("CahcedStore can not use events for invalidation as there is no " +
+                " TransactionalMetaStoreEventListener to add events to notification table");
+      }
+
       endFunctionListeners = MetaStoreServerUtils.getMetaStoreListeners(
           MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS));
 
@@ -5786,14 +5803,33 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           colStats.getStatsDesc().getCatName(), colStats.getStatsDesc().getDbName(),
           colStats.getStatsDesc().getTableName()));
 
-      boolean ret = false;
+      Map<String, String> parameters = null;
+      getMS().openTransaction();
+      boolean committed = false;
       try {
-        ret = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId) != null;
+        parameters = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId);
+        if (parameters != null) {
+          if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                    EventType.UPDATE_TABLE_COLUMN_STAT,
+                    new UpdateTableColumnStatEvent(colStats, parameters, validWriteIds, writeId, this));
+          }
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                    EventType.UPDATE_TABLE_COLUMN_STAT,
+                    new UpdateTableColumnStatEvent(colStats, parameters, validWriteIds, writeId, this));
+          }
+        }
+        committed = getMS().commitTransaction();
       } finally {
-        endFunction("write_column_statistics", ret != false, null,
+        if (!committed) {
+          getMS().rollbackTransaction();
+        }
+        endFunction("write_column_statistics", parameters != null, null,
             colStats.getStatsDesc().getTableName());
       }
-      return ret;
+
+      return parameters != null;
     }
 
     private void normalizeColStatsInput(ColumnStatistics colStats) throws MetaException {
@@ -5826,16 +5862,37 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
       boolean ret = false;
 
+      Map<String, String> parameters;
+      List<String> partVals;
+      boolean committed = false;
+      getMS().openTransaction();
       try {
         if (tbl == null) {
           tbl = getTable(catName, dbName, tableName);
         }
-        List<String> partVals = getPartValsFromName(tbl, csd.getPartName());
-        return getMS().updatePartitionColumnStatistics(
-            colStats, partVals, validWriteIds, writeId) != null;
+        partVals = getPartValsFromName(tbl, csd.getPartName());
+        parameters = getMS().updatePartitionColumnStatistics(colStats, partVals, validWriteIds, writeId);
+        if (parameters != null) {
+          if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+              EventType.UPDATE_PARTITION_COLUMN_STAT,
+              new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, validWriteIds, writeId, this));
+          }
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+              EventType.UPDATE_PARTITION_COLUMN_STAT,
+              new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, validWriteIds, writeId, this));
+          }
+        }
+        committed = getMS().commitTransaction();
       } finally {
+        if (!committed) {
+          getMS().rollbackTransaction();
+        }
         endFunction("write_partition_column_statistics", ret != false, null, tableName);
       }
+
+      return parameters != null;
     }
 
     @Override
@@ -5889,6 +5946,20 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ret = getMS().deletePartitionColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
                                                       convertedPartName, partVals, colName);
+        if (ret) {
+          if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                    EventType.DELETE_PARTITION_COLUMN_STAT,
+                    new DeletePartitionColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
+                            convertedPartName, partVals, colName, this));
+          }
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                    EventType.DELETE_PARTITION_COLUMN_STAT,
+                    new DeletePartitionColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
+                            convertedPartName, partVals, colName, this));
+          }
+        }
         committed = getMS().commitTransaction();
       } finally {
         if (!committed) {
@@ -5926,6 +5997,20 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         ret = getMS().deleteTableColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, colName);
+        if (ret) {
+          if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                    EventType.DELETE_TABLE_COLUMN_STAT,
+                    new DeleteTableColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
+                            tableName, colName, this));
+          }
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                    EventType.DELETE_TABLE_COLUMN_STAT,
+                    new DeleteTableColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
+                            tableName, colName, this));
+          }
+        }
         committed = getMS().commitTransaction();
       } finally {
         if (!committed) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
index de226bf..0d6d10f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
@@ -56,6 +56,10 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent;
 import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
 import java.sql.Connection;
 
@@ -294,6 +298,49 @@ public abstract class MetaStoreEventListener implements Configurable {
           throws MetaException {
   }
 
+  /**
+   * This will be called to update table column stats
+   * @param updateTableColumnStatEvent event to be processed
+   * @throws MetaException
+   */
+  public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumnStatEvent)
+          throws MetaException {
+  }
+
+  /**
+   * This will be called to delete table column stats
+   * @param deleteTableColumnStatEvent event to be processed
+   * @throws MetaException
+   */
+  public void onDeleteTableColumnStat(DeleteTableColumnStatEvent deleteTableColumnStatEvent)
+          throws MetaException {
+  }
+
+  /**
+   * This will be called to update partition column stats
+   * @param updatePartColStatEvent event to be processed
+   * @throws MetaException
+   */
+  public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePartColStatEvent)
+          throws MetaException {
+  }
+
+  /**
+   * This will be called to delete partition column stats
+   * @param deletePartColStatEvent event to be processed
+   * @throws MetaException
+   */
+  public void onDeletePartitionColumnStat(DeletePartitionColumnStatEvent deletePartColStatEvent)
+          throws MetaException {
+  }
+
+  /**
+   * This is to check if the listener adds the event info to notification log table.
+   */
+  public boolean doesAddEventsToNotificationLogTable() {
+    return false;
+  }
+
   @Override
   public Configuration getConf() {
     return this.conf;

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
index c296f57..dd82c4b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -55,6 +55,10 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent;
 import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
 import java.sql.Connection;
 import java.util.List;
@@ -224,6 +228,14 @@ public class MetaStoreListenerNotifier {
               (listener, event) -> listener.onAllocWriteId((AllocWriteIdEvent) event, null, null))
           .put(EventType.ACID_WRITE,
                   (listener, event) -> listener.onAcidWrite((AcidWriteEvent) event, null, null))
+          .put(EventType.UPDATE_TABLE_COLUMN_STAT,
+                      (listener, event) -> listener.onUpdateTableColumnStat((UpdateTableColumnStatEvent) event))
+          .put(EventType.DELETE_TABLE_COLUMN_STAT,
+                  (listener, event) -> listener.onDeleteTableColumnStat((DeleteTableColumnStatEvent) event))
+          .put(EventType.UPDATE_PARTITION_COLUMN_STAT,
+                  (listener, event) -> listener.onUpdatePartitionColumnStat((UpdatePartitionColumnStatEvent) event))
+          .put(EventType.DELETE_PARTITION_COLUMN_STAT,
+                  (listener, event) -> listener.onDeletePartitionColumnStat((DeletePartitionColumnStatEvent) event))
           .build()
   );
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0324a19..d43c0c1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -4236,7 +4236,7 @@ public class ObjectStore implements RawStore, Configurable {
    * Verifies that the stats JSON string is unchanged for alter table (txn stats).
    * @return Error message with the details of the change, or null if the value has not changed.
    */
-  private static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP,
+  public static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP,
       long writeId, String validWriteIds, boolean isColStatsChange) {
     if (validWriteIds != null && writeId > 0) return null; // We have txn context.
     String oldVal = oldP == null ? null : oldP.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
@@ -9859,13 +9859,25 @@ public class ObjectStore implements RawStore, Configurable {
     try {
       openTransaction();
       long lastEvent = rqst.getLastEvent();
-      query = pm.newQuery(MNotificationLog.class, "eventId > lastEvent");
-      query.declareParameters("java.lang.Long lastEvent");
+      List<Object> parameterVals = new ArrayList<>();
+      parameterVals.add(lastEvent);
+      StringBuilder filterBuilder = new StringBuilder("eventId > para" + parameterVals.size());
+      StringBuilder parameterBuilder = new StringBuilder("java.lang.Long para" + parameterVals.size());
+      if (rqst.isSetEventTypeSkipList()) {
+        for (String eventType : rqst.getEventTypeSkipList()) {
+          parameterVals.add(eventType);
+          parameterBuilder.append(", java.lang.String para" + parameterVals.size());
+          filterBuilder.append(" && eventType != para" + parameterVals.size());
+        }
+      }
+      query = pm.newQuery(MNotificationLog.class, filterBuilder.toString());
+      query.declareParameters(parameterBuilder.toString());
       query.setOrdering("eventId ascending");
       int maxEventResponse = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_MAX_EVENT_RESPONSE);
       int maxEvents = (rqst.getMaxEvents() < maxEventResponse && rqst.getMaxEvents() > 0) ? rqst.getMaxEvents() : maxEventResponse;
       query.setRange(0, maxEvents);
-      Collection<MNotificationLog> events = (Collection) query.execute(lastEvent);
+      Collection<MNotificationLog> events =
+              (Collection) query.executeWithArray(parameterVals.toArray(new Object[parameterVals.size()]));
       commited = commitTransaction();
       if (events == null) {
         return result;

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index e4ef46f..bb504b0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.DatabaseName;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
@@ -49,13 +50,29 @@ import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.HiveAlterHandler;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.DeleteTableColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.DeletePartitionColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
@@ -94,9 +111,11 @@ public class CachedStore implements RawStore, Configurable {
   private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm();
   private RawStore rawStore = null;
   private Configuration conf;
-  private boolean areTxnStatsSupported;
+  private static boolean areTxnStatsSupported;
   private PartitionExpressionProxy expressionProxy = null;
   private static final SharedCache sharedCache = new SharedCache();
+  private static  boolean canUseEvents = false;
+  private static long lastEventId;
 
   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
 
@@ -119,7 +138,38 @@ public class CachedStore implements RawStore, Configurable {
     initSharedCache(conf);
   }
 
+  synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) {
+    if (!isCachePrewarmed.get()) {
+      LOG.error("cache update should be done only after prewarm");
+      throw new RuntimeException("cache update should be done only after prewarm");
+    }
+    long startTime = System.nanoTime();
+    long preEventId = lastEventId;
+    try {
+      lastEventId = updateUsingNotificationEvents(rawStore, lastEventId);
+    } catch (Exception e) {
+      LOG.error(" cache update failed for start event id " + lastEventId + " with error ", e);
+      throw new RuntimeException(e.getMessage());
+    } finally {
+      long endTime = System.nanoTime();
+      LOG.info("Time taken in updateUsingNotificationEvents for num events : " +  (lastEventId - preEventId) + " = " +
+              (endTime - startTime) / 1000000 + "ms");
+    }
+  }
+
+  synchronized private static void triggerPreWarm(RawStore rawStore) {
+    lastEventId = rawStore.getCurrentNotificationEventId().getEventId();
+    prewarm(rawStore);
+  }
+
   private void setConfInternal(Configuration conf) {
+    if (MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT)) {
+      canUseEvents = true;
+    } else {
+      canUseEvents = false;
+    }
+    LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store");
+
     String rawStoreClassName =
         MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
     if (rawStore == null) {
@@ -151,6 +201,201 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @VisibleForTesting
+  public static SharedCache getSharedCache() {
+   return sharedCache;
+  }
+
+  static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table before, String catalogName,
+                                          String dbName, String tableName, Partition part) throws Exception {
+    ColumnStatistics colStats;
+    List<String> deletedCols = new ArrayList<>();
+    colStats = HiveAlterHandler.updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName,
+                    part.getValues(), part.getSd().getCols(), before, part, null, deletedCols);
+    for (String column : deletedCols) {
+      sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column);
+    }
+    if (colStats != null) {
+      sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, part.getValues(), colStats.getStatsObj());
+    }
+    return colStats;
+  }
+
+  static private void updateStatsForTable(RawStore rawStore, Table before, Table after, String catalogName,
+                                          String dbName, String tableName) throws Exception {
+    ColumnStatistics colStats = null;
+    List<String> deletedCols = new ArrayList<>();
+    if (before.isSetPartitionKeys()) {
+      List<Partition> parts = sharedCache.listCachedPartitions(catalogName, dbName, tableName, -1);
+      for (Partition part : parts) {
+        colStats = updateStatsForPart(rawStore, before, catalogName, dbName, tableName, part);
+      }
+    }
+
+    boolean needUpdateAggrStat = false;
+    List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before,
+            after,null, null, rawStore.getConf(), deletedCols);
+    if (colStats != null) {
+      sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName, statisticsObjs);
+      needUpdateAggrStat = true;
+    }
+    for (String column : deletedCols) {
+      sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column);
+      needUpdateAggrStat = true;
+    }
+  }
+
+  @VisibleForTesting
+  public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId) throws Exception {
+    LOG.debug("updating cache using notification events starting from event id " + lastEventId);
+    NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
+
+    //Add the events which are not related to metadata update
+    rqst.addToEventTypeSkipList(MessageBuilder.INSERT_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.OPEN_TXN_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.COMMIT_TXN_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ABORT_TXN_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ALLOC_WRITE_ID_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ACID_WRITE_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.CREATE_FUNCTION_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.DROP_FUNCTION_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ADD_PRIMARYKEY_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ADD_FOREIGNKEY_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.DROP_CONSTRAINT_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.CREATE_ISCHEMA_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ALTER_ISCHEMA_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.DROP_ISCHEMA_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ADD_SCHEMA_VERSION_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.ALTER_SCHEMA_VERSION_EVENT);
+    rqst.addToEventTypeSkipList(MessageBuilder.DROP_SCHEMA_VERSION_EVENT);
+
+    Deadline.startTimer("getNextNotification");
+    NotificationEventResponse resp = rawStore.getNextNotification(rqst);
+    Deadline.stopTimer();
+
+    if (resp == null || resp.getEvents() == null) {
+      LOG.debug("no events to process");
+      return lastEventId;
+    }
+
+    List<NotificationEvent> eventList = resp.getEvents();
+    LOG.debug("num events to process" + eventList.size());
+
+    for (NotificationEvent event : eventList) {
+      long eventId = event.getEventId();
+      if (eventId <= lastEventId) {
+        LOG.error("Event id is not valid " + lastEventId + " : " + eventId);
+        throw new RuntimeException(" event id is not valid " + lastEventId + " : " + eventId);
+      }
+      lastEventId = eventId;
+      String message = event.getMessage();
+      LOG.debug("Event to process " + event);
+      MessageDeserializer deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
+      String catalogName = event.getCatName() == null ? "" : event.getCatName().toLowerCase();
+      String dbName = event.getDbName() == null ? "" : event.getDbName().toLowerCase();
+      String tableName = event.getTableName() == null ? "" : event.getTableName().toLowerCase();
+      if (!shouldCacheTable(catalogName, dbName, tableName)) {
+        continue;
+      }
+      switch (event.getEventType()) {
+        case MessageBuilder.ADD_PARTITION_EVENT:
+          AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message);
+          sharedCache.addPartitionsToCache(catalogName,
+                  dbName, tableName, addPartMessage.getPartitionObjs());
+          break;
+        case MessageBuilder.ALTER_PARTITION_EVENT:
+          AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message);
+          sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
+                  alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter());
+          //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+          if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(),
+                  catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()) != null) {
+            CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore, catalogName, dbName, tableName);
+          }
+          break;
+        case MessageBuilder.DROP_PARTITION_EVENT:
+          DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
+          for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) {
+            sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values()));
+          }
+          break;
+        case MessageBuilder.CREATE_TABLE_EVENT:
+          CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message);
+          sharedCache.addTableToCache(catalogName, dbName,
+                  tableName, createTableMessage.getTableObj());
+          break;
+        case MessageBuilder.ALTER_TABLE_EVENT:
+          AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
+          sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
+          //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+          updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
+                  catalogName, dbName, tableName);
+          break;
+        case MessageBuilder.DROP_TABLE_EVENT:
+          DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message);
+          int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
+          String tableDnsPath = null;
+          Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation());
+          if (tablePath != null) {
+            tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString();
+          }
+
+          while (true) {
+            Map<String, String> partitionLocations = rawStore.getPartitionLocations(catalogName, dbName, tableName,
+                    tableDnsPath, batchSize);
+            if (partitionLocations == null || partitionLocations.isEmpty()) {
+              break;
+            }
+            sharedCache.removePartitionFromCache(catalogName, dbName, tableName,
+                    new ArrayList<>(partitionLocations.values()));
+          }
+          sharedCache.removeTableFromCache(catalogName, dbName, tableName);
+          break;
+        case MessageBuilder.CREATE_DATABASE_EVENT:
+          CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message);
+          sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject());
+          break;
+        case MessageBuilder.ALTER_DATABASE_EVENT:
+          AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message);
+          sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter());
+          break;
+        case MessageBuilder.DROP_DATABASE_EVENT:
+          sharedCache.removeDatabaseFromCache(catalogName, dbName);
+          break;
+        case MessageBuilder.CREATE_CATALOG_EVENT:
+        case MessageBuilder.DROP_CATALOG_EVENT:
+        case MessageBuilder.ALTER_CATALOG_EVENT:
+          // TODO : Need to add cache invalidation for catalog events
+          LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType());
+          break;
+        case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
+          UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
+          updateTableColumnsStatsInternal(rawStore.getConf(), msg.getColumnStatistics(), msg.getParameters(),
+                  msg.getValidWriteIds(), msg.getWriteId());
+          break;
+        case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
+          DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
+          sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName());
+          break;
+        case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
+          UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
+          sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getPartVals(),
+                  msgPartUpdate.getColumnStatistics().getStatsObj());
+          break;
+        case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
+          DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message);
+          sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName,
+                  msgPart.getPartValues(), msgPart.getColName());
+          break;
+        default:
+          LOG.error("Event is not supported for cache invalidation : " + event.getEventType());
+      }
+    }
+    return lastEventId;
+  }
+
+  @VisibleForTesting
   /**
    * This initializes the caches in SharedCache by getting the objects from Metastore DB via
    * ObjectStore and populating the respective caches
@@ -161,6 +406,7 @@ public class CachedStore implements RawStore, Configurable {
     }
     long startTime = System.nanoTime();
     LOG.info("Prewarming CachedStore");
+    long sleepTime = 100;
     while (!isCachePrewarmed.get()) {
       // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
       Deadline.registerIfNot(1000000);
@@ -176,6 +422,12 @@ public class CachedStore implements RawStore, Configurable {
         sharedCache.populateCatalogsInCache(catalogs);
       } catch (MetaException | NoSuchObjectException e) {
         LOG.warn("Failed to populate catalogs in cache, going to try again", e);
+        try {
+          Thread.sleep(sleepTime);
+          sleepTime = sleepTime * 2;
+        } catch (InterruptedException timerEx) {
+          LOG.info("sleep interrupted", timerEx.getMessage());
+        }
         // try again
         continue;
       }
@@ -407,8 +659,7 @@ public class CachedStore implements RawStore, Configurable {
     }
     if (runOnlyOnce) {
       // Some tests control the execution of the background update thread
-      cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
-          TimeUnit.MILLISECONDS);
+      cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS);
     }
   }
 
@@ -439,6 +690,7 @@ public class CachedStore implements RawStore, Configurable {
     private boolean shouldRunPrewarm = true;
     private final RawStore rawStore;
 
+
     CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
       this.shouldRunPrewarm = shouldRunPrewarm;
       String rawStoreClassName =
@@ -456,11 +708,19 @@ public class CachedStore implements RawStore, Configurable {
     @Override
     public void run() {
       if (!shouldRunPrewarm) {
-        // TODO: prewarm and update can probably be merged.
-        update();
+        if (canUseEvents) {
+          try {
+            triggerUpdateUsingEvent(rawStore);
+          } catch (Exception e) {
+            LOG.error("failed to update cache using events ", e);
+          }
+        } else {
+          // TODO: prewarm and update can probably be merged.
+          update();
+        }
       } else {
         try {
-          prewarm(rawStore);
+          triggerPreWarm(rawStore);
         } catch (Exception e) {
           LOG.error("Prewarm failure", e);
           return;
@@ -619,7 +879,7 @@ public class CachedStore implements RawStore, Configurable {
 
     // Update cached aggregate stats for all partitions of a table and for all
     // but default partition
-    private void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
+    private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
                                                        String tblName) {
       try {
         Table table = rawStore.getTable(catName, dbName, tblName);
@@ -675,7 +935,22 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public boolean commitTransaction() {
-    return rawStore.commitTransaction();
+    if (!rawStore.commitTransaction()) {
+      return false;
+    }
+
+    // In case of event based update, shared cache is not updated directly to avoid inconsistency.
+    // For example, if metastore B add a partition, then metastore A drop a partition later. However, on metastore A,
+    // it first get drop partition request, then from notification, create the partition. If there's no tombstone
+    // entry in partition cache to tell drop is after creation, we end up consumes the creation request. Though
+    // eventually there's drop partition notification, but during the interim, later event takes precedence.
+    // So we will not update the cache during raw store operation but wait during commit transaction to make sure that
+    // the event related to the current transactions are updated in the cache and thus we can support strong
+    // consistency in case there is only one metastore.
+    if (canUseEvents) {
+      triggerUpdateUsingEvent(rawStore);
+    }
+    return true;
   }
 
   @Override
@@ -691,19 +966,26 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public void createCatalog(Catalog cat) throws MetaException {
     rawStore.createCatalog(cat);
-    sharedCache.addCatalogToCache(cat);
+    // in case of event based cache update, cache will not be updated for catalog.
+    if (!canUseEvents) {
+      sharedCache.addCatalogToCache(cat);
+    }
   }
 
   @Override
   public void alterCatalog(String catName, Catalog cat) throws MetaException,
       InvalidOperationException {
     rawStore.alterCatalog(catName, cat);
-    sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat);
+    // in case of event based cache update, cache will not be updated for catalog.
+    if (!canUseEvents) {
+      sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat);
+    }
   }
 
   @Override
   public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
-    if (!sharedCache.isCatalogCachePrewarmed()) {
+    // in case of event based cache update, cache will not be updated for catalog.
+    if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
       return rawStore.getCatalog(catalogName);
     }
     Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName));
@@ -715,7 +997,8 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getCatalogs() throws MetaException {
-    if (!sharedCache.isCatalogCachePrewarmed()) {
+    // in case of event based cache update, cache will not be updated for catalog.
+    if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
       return rawStore.getCatalogs();
     }
     return sharedCache.listCachedCatalogs();
@@ -724,19 +1007,29 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
     rawStore.dropCatalog(catalogName);
-    catalogName = catalogName.toLowerCase();
-    sharedCache.removeCatalogFromCache(catalogName);
+
+    // in case of event based cache update, cache will not be updated for catalog.
+    if (!canUseEvents) {
+      catalogName = catalogName.toLowerCase();
+      sharedCache.removeCatalogFromCache(catalogName);
+    }
   }
 
   @Override
   public void createDatabase(Database db) throws InvalidObjectException, MetaException {
     rawStore.createDatabase(db);
-    sharedCache.addDatabaseToCache(db);
+    // in case of event based cache update, cache will be updated during commit.
+    if (!canUseEvents) {
+      sharedCache.addDatabaseToCache(db);
+    }
   }
 
   @Override
   public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
-    if (!sharedCache.isDatabaseCachePrewarmed()) {
+    // in case of  event based cache update, cache will be updated during commit. So within active transaction, read
+    // directly from rawStore to avoid reading stale data as the data updated during same transaction will not be
+    // updated in the cache.
+    if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getDatabase(catName, dbName);
     }
     dbName = dbName.toLowerCase();
@@ -751,7 +1044,8 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.dropDatabase(catName, dbName);
-    if (succ) {
+    if (succ && !canUseEvents) {
+      // in case of event based cache update, cache will be updated during commit.
       sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
           StringUtils.normalizeIdentifier(dbName));
     }
@@ -762,7 +1056,8 @@ public class CachedStore implements RawStore, Configurable {
   public boolean alterDatabase(String catName, String dbName, Database db)
       throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.alterDatabase(catName, dbName, db);
-    if (succ) {
+    if (succ && !canUseEvents) {
+      // in case of event based cache update, cache will be updated during commit.
       sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName),
           StringUtils.normalizeIdentifier(dbName), db);
     }
@@ -771,7 +1066,7 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getDatabases(String catName, String pattern) throws MetaException {
-    if (!sharedCache.isDatabaseCachePrewarmed()) {
+    if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getDatabases(catName, pattern);
     }
     return sharedCache.listCachedDatabases(catName, pattern);
@@ -779,7 +1074,7 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getAllDatabases(String catName) throws MetaException {
-    if (!sharedCache.isDatabaseCachePrewarmed()) {
+    if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getAllDatabases(catName);
     }
     return sharedCache.listCachedDatabases(catName);
@@ -821,6 +1116,10 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
     rawStore.createTable(tbl);
+    // in case of event based cache update, cache will be updated during commit.
+    if (canUseEvents) {
+      return;
+    }
     String catName = normalizeIdentifier(tbl.getCatName());
     String dbName = normalizeIdentifier(tbl.getDbName());
     String tblName = normalizeIdentifier(tbl.getTableName());
@@ -835,7 +1134,8 @@ public class CachedStore implements RawStore, Configurable {
   public boolean dropTable(String catName, String dbName, String tblName)
       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropTable(catName, dbName, tblName);
-    if (succ) {
+    // in case of event based cache update, cache will be updated during commit.
+    if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
       dbName = normalizeIdentifier(dbName);
       tblName = normalizeIdentifier(tblName);
@@ -857,7 +1157,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getTable(catName, dbName, tblName, validWriteIds);
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -898,7 +1198,8 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartition(part);
-    if (succ) {
+    // in case of event based cache update, cache will be updated during commit.
+    if (succ && !canUseEvents) {
       String dbName = normalizeIdentifier(part.getDbName());
       String tblName = normalizeIdentifier(part.getTableName());
       String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME;
@@ -914,7 +1215,8 @@ public class CachedStore implements RawStore, Configurable {
   public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
       throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts);
-    if (succ) {
+    // in case of event based cache update, cache will be updated during commit.
+    if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
       dbName = normalizeIdentifier(dbName);
       tblName = normalizeIdentifier(tblName);
@@ -930,7 +1232,8 @@ public class CachedStore implements RawStore, Configurable {
   public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec,
       boolean ifNotExists) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists);
-    if (succ) {
+    // in case of event based cache update, cache will be updated during commit.
+    if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
       dbName = normalizeIdentifier(dbName);
       tblName = normalizeIdentifier(tblName);
@@ -959,7 +1262,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getPartition(
           catName, dbName, tblName, part_vals, validWriteIds);
     }
@@ -990,7 +1293,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1005,7 +1308,8 @@ public class CachedStore implements RawStore, Configurable {
   public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals)
       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals);
-    if (succ) {
+    // in case of event based cache update, cache will be updated during commit.
+    if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
       dbName = normalizeIdentifier(dbName);
       tblName = normalizeIdentifier(tblName);
@@ -1021,6 +1325,10 @@ public class CachedStore implements RawStore, Configurable {
   public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
       throws MetaException, NoSuchObjectException {
     rawStore.dropPartitions(catName, dbName, tblName, partNames);
+    // in case of event based cache update, cache will be updated during commit.
+    if (canUseEvents) {
+      return;
+    }
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1040,7 +1348,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getPartitions(catName, dbName, tblName, max);
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1062,6 +1370,10 @@ public class CachedStore implements RawStore, Configurable {
   public Table alterTable(String catName, String dbName, String tblName, Table newTable,
       String validWriteIds) throws InvalidObjectException, MetaException {
     newTable = rawStore.alterTable(catName, dbName, tblName, newTable, validWriteIds);
+    // in case of event based cache update, cache will be updated during commit.
+    if (canUseEvents) {
+      return  newTable;
+    }
     catName = normalizeIdentifier(catName);
     dbName = normalizeIdentifier(dbName);
     tblName = normalizeIdentifier(tblName);
@@ -1096,7 +1408,8 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() ||
+            (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getTables(catName, dbName, pattern);
     }
     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
@@ -1106,7 +1419,8 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public List<String> getTables(String catName, String dbName, String pattern, TableType tableType)
       throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()
+            || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getTables(catName, dbName, pattern, tableType);
     }
     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
@@ -1123,7 +1437,8 @@ public class CachedStore implements RawStore, Configurable {
   public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
                                       List<String> tableTypes) throws MetaException {
     // TODO Check if all required tables are allowed, if so, get it from cache
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() ||
+            (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
     }
     return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName),
@@ -1134,6 +1449,9 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
       throws MetaException, UnknownDBException {
+    if (canUseEvents && rawStore.isActiveTransaction()) {
+      return rawStore.getTableObjectsByName(catName, dbName, tblNames);
+    }
     dbName = normalizeIdentifier(dbName);
     catName = normalizeIdentifier(catName);
     boolean missSomeInCache = false;
@@ -1168,7 +1486,8 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getAllTables(String catName, String dbName) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() ||
+            (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getAllTables(catName, dbName);
     }
     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
@@ -1188,7 +1507,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1218,6 +1537,10 @@ public class CachedStore implements RawStore, Configurable {
       List<String> partVals, Partition newPart, String validWriteIds)
           throws InvalidObjectException, MetaException {
     newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds);
+    // in case of event based cache update, cache will be updated during commit.
+    if (canUseEvents) {
+      return newPart;
+    }
     catName = normalizeIdentifier(catName);
     dbName = normalizeIdentifier(dbName);
     tblName = normalizeIdentifier(tblName);
@@ -1235,6 +1558,10 @@ public class CachedStore implements RawStore, Configurable {
       throws InvalidObjectException, MetaException {
     newParts = rawStore.alterPartitions(
         catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds);
+    // in case of event based cache update, cache will be updated during commit.
+    if (canUseEvents) {
+      return newParts;
+    }
     catName = normalizeIdentifier(catName);
     dbName = normalizeIdentifier(dbName);
     tblName = normalizeIdentifier(tblName);
@@ -1286,7 +1613,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, result);
     }
     List<String> partNames = new LinkedList<>();
@@ -1317,7 +1644,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
     }
     String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
@@ -1332,7 +1659,8 @@ public class CachedStore implements RawStore, Configurable {
     return partNames.size();
   }
 
-  private static List<String> partNameToVals(String name) {
+  @VisibleForTesting
+  public static List<String> partNameToVals(String name) {
     if (name == null) {
       return null;
     }
@@ -1350,7 +1678,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1537,7 +1865,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1562,7 +1890,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1591,7 +1919,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.listPartitionNamesPs(catName, dbName, tblName, partSpecs, maxParts);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1620,7 +1948,7 @@ public class CachedStore implements RawStore, Configurable {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(catName, dbName, tblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partSpecs, maxParts, userName, groupNames);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1702,29 +2030,58 @@ public class CachedStore implements RawStore, Configurable {
     return colStat;
   }
 
+  private static void updateTableColumnsStatsInternal(Configuration conf, ColumnStatistics colStats,
+                                                      Map<String, String> newParams, String validWriteIds,
+                                                      long writeId) throws MetaException {
+    String catName = colStats.getStatsDesc().isSetCatName() ?
+            normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
+            getDefaultCatalog(conf);
+    String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
+    String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
+    if (!shouldCacheTable(catName, dbName, tblName)) {
+      return;
+    }
+    Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return;
+    }
+
+    boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters());
+    if (isTxn && validWriteIds != null) {
+      if (!areTxnStatsSupported) {
+        StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
+      } else {
+        String errorMsg = ObjectStore.verifyStatsChangeCtx(
+                table.getParameters(), newParams, writeId, validWriteIds, true);
+        if (errorMsg != null) {
+          throw new MetaException(errorMsg);
+        }
+        if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams, table.getWriteId(),
+                validWriteIds, true)) {
+          // Make sure we set the flag to invalid regardless of the current value.
+          StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
+          LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table "
+                  + table.getDbName() + "." + table.getTableName());
+        }
+      }
+    }
+
+    table.setWriteId(writeId);
+    table.setParameters(newParams);
+    sharedCache.alterTableInCache(catName, dbName, tblName, table);
+    sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj());
+  }
+
   @Override
   public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats,
       String validWriteIds, long writeId)
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     Map<String, String> newParams = rawStore.updateTableColumnStatistics(
         colStats, validWriteIds, writeId);
-    if (newParams != null) {
-      String catName = colStats.getStatsDesc().isSetCatName() ?
-          normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
-          getDefaultCatalog(conf);
-      String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
-      String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
-      if (!shouldCacheTable(catName, dbName, tblName)) {
-        return newParams;
-      }
-      Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
-      if (table == null) {
-        // The table is not yet loaded in cache
-        return newParams;
-      }
-      table.setParameters(newParams);
-      sharedCache.alterTableInCache(catName, dbName, tblName, table);
-      sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj());
+    // in case of event based cache update, cache will be updated during commit.
+    if (newParams != null && !canUseEvents) {
+      updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId);
     }
     return newParams;
   }
@@ -1765,7 +2122,8 @@ public class CachedStore implements RawStore, Configurable {
                                              String colName)
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName);
-    if (succ) {
+    // in case of event based cache update, cache is updated during commit txn
+    if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
       dbName = normalizeIdentifier(dbName);
       tblName = normalizeIdentifier(tblName);
@@ -1783,7 +2141,8 @@ public class CachedStore implements RawStore, Configurable {
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     Map<String, String> newParams = rawStore.updatePartitionColumnStatistics(
         colStats, partVals, validWriteIds, writeId);
-    if (newParams != null) {
+    // in case of event based cache update, cache is updated during commit txn
+    if (newParams != null && !canUseEvents) {
       String catName = colStats.getStatsDesc().isSetCatName() ?
           normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
       String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
@@ -1822,7 +2181,8 @@ public class CachedStore implements RawStore, Configurable {
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     boolean succ =
         rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
-    if (succ) {
+    // in case of event based cache update, cache is updated during commit txn.
+    if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
       dbName = normalizeIdentifier(dbName);
       tblName = normalizeIdentifier(tblName);
@@ -1851,8 +2211,9 @@ public class CachedStore implements RawStore, Configurable {
     tblName = StringUtils.normalizeIdentifier(tblName);
     // TODO: we currently cannot do transactional checks for stats here
     //       (incl. due to lack of sync w.r.t. the below rawStore call).
-    if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null) {
-      rawStore.get_aggr_stats_for(
+    //TODO : need to calculate aggregate locally in cached store
+    if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || canUseEvents) {
+      return rawStore.get_aggr_stats_for(
           catName, dbName, tblName, partNames, colNames, writeIdList);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -2246,6 +2607,10 @@ public class CachedStore implements RawStore, Configurable {
     // TODO constraintCache
     List<String> constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys,
         foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+    // in case of event based cache update, cache is updated during commit.
+    if (canUseEvents) {
+      return constraintNames;
+    }
     String dbName = normalizeIdentifier(tbl.getDbName());
     String tblName = normalizeIdentifier(tbl.getTableName());
     String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) :

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index c24e716..ce9e383 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -210,7 +210,7 @@ public class SharedCache {
       }
     }
 
-    boolean cachePartitions(List<Partition> parts, SharedCache sharedCache) {
+    boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache) {
       try {
         tableLock.writeLock().lock();
         for (Partition part : parts) {
@@ -292,6 +292,9 @@ public class SharedCache {
         tableLock.writeLock().lock();
         PartitionWrapper wrapper =
             partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+        if (wrapper == null) {
+          return null;
+        }
         isPartitionCacheDirty.set(true);
         part = CacheUtils.assemble(wrapper, sharedCache);
         if (wrapper.getSdHash() != null) {
@@ -1171,6 +1174,10 @@ public class SharedCache {
       }
       TableWrapper tblWrapper =
           tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+      if (tblWrapper == null) {
+        //in case of retry, ignore second try.
+        return;
+      }
       byte[] sdHash = tblWrapper.getSdHash();
       if (sdHash != null) {
         decrSd(sdHash);
@@ -1408,7 +1415,7 @@ public class SharedCache {
   }
 
   public void addPartitionsToCache(String catName, String dbName, String tblName,
-      List<Partition> parts) {
+      Iterable<Partition> parts) {
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java
new file mode 100644
index 0000000..d64b57d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+
+import java.util.List;
+
+/**
+ * DeletePartitionColumnStatEvent
+ * Event generated for partition column stat delete event.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class DeletePartitionColumnStatEvent extends ListenerEvent {
+  private String catName, dbName, tableName, colName, partName;
+
+  private List<String> partVals;
+
+  /**
+   * @param catName catalog name
+   * @param dbName database name
+   * @param tableName table name
+   * @param partName partition column name
+   * @param partVals partition value
+   * @param colName column name
+   * @param handler handler that is firing the event
+   */
+  public DeletePartitionColumnStatEvent(String catName, String dbName, String tableName, String partName,
+                                        List<String> partVals, String colName, IHMSHandler handler) {
+    super(true, handler);
+    this.catName = catName;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.colName = colName;
+    this.partName = partName;
+    this.partVals = partVals;
+  }
+
+  public String getCatName() {
+    return catName;
+  }
+
+  public String getDBName() {
+    return dbName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getColName() {
+    return colName;
+  }
+
+  public String getPartName() {
+    return partName;
+  }
+
+  public List<String> getPartVals() {
+    return partVals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java
new file mode 100644
index 0000000..7638744
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+
+/**
+ * DeleteTableColumnStatEvent
+ * Event generated for table column stat delete event.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class DeleteTableColumnStatEvent extends ListenerEvent {
+  private String catName, dbName, tableName, colName;
+
+  /**
+   * @param catName catalog name
+   * @param dbName database name
+   * @param tableName table name
+   * @param colName column name
+   * @param handler handler that is firing the event
+   */
+  public DeleteTableColumnStatEvent(String catName, String dbName, String tableName, String colName, IHMSHandler handler) {
+    super(true, handler);
+    this.catName = catName;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.colName = colName;
+  }
+
+  public String getCatName() {
+    return catName;
+  }
+
+  public String getDBName() {
+    return dbName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getColName() {
+    return colName;
+  }
+}