You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/30 13:20:30 UTC

[impala] 04/05: IMPALA-7970 : Add support for metastore event based automatic invalidate

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 389fb169d2782729aa48b820f3cd5bbfcb5f8dd8
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Thu Dec 20 15:55:04 2018 -0800

    IMPALA-7970 : Add support for metastore event based automatic invalidate
    
    This change adds support to CatalogD to poll metastore events to issue
    invalidate on tables automatically. It adds basic infrastructure to poll
    HMS notifications events at a configurable frequency using a backend
    config called hms_event_polling_interval_s flag. Currently, it issues
    invalidate at tables when it received alter events on table and
    partitions. It also adds tables/databases and removes tables from
    catalogD when it receives create_table/create_database and
    drop_table/drop_database events. The default value of
    hms_event_polling_interval_s is 0 which disables the feature. A
    non-zero value in seconds of this configuration can be used to enable
    the feature and set the polling frequency.
    
    In order to process each event atomically, this feature relies on
    version lock in CatalogServiceCatalog. It adds new methods in
    CatalogServiceCatalog which takes a write lock on version so that
    readers are blocked until the catalog state is updated based on the
    events. In case of processing events, the metastore operation is already
    completed and only catalog state needs to be updated. Hence we do not
    need to make new metastore calls while processing the events and only
    version lock is sufficient to serialize updates to the catalog objects
    based on events. This locking protocol is similar to what is done in
    case of DDL processing in CatalogOpExecutor except it does not need to
    take metastoreDdlLock since no metastore operations are needed during
    event processing.
    
    The change also adds a new test class to test the basic functionality
    for each of the event type which is supported.
    
    Note that this feature is still a work in progress and additional
    improvements will be done in subsequent patches. By default the feature
    is turned off.
    
    Change-Id: Ic70b27780560b7ac9b33418d132b36cd0ca4abf7
    Reviewed-on: http://gerrit.cloudera.org:8080/12118
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |  14 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../impala/catalog/CatalogServiceCatalog.java      | 208 ++++-
 .../catalog/events/ExternalEventsProcessor.java    |  54 ++
 .../impala/catalog/events/MetastoreEventUtils.java | 697 +++++++++++++++
 .../catalog/events/MetastoreEventsProcessor.java   | 419 +++++++++
 .../events/MetastoreNotificationException.java     |  35 +
 .../MetastoreNotificationFetchException.java       |  35 +
 .../impala/catalog/events/NoOpEventProcessor.java  |  62 ++
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../events/MetastoreEventsProcessorTest.java       | 977 +++++++++++++++++++++
 .../SynchronousHMSEventProcessorForTests.java      |  36 +
 .../resources/postgresql-hive-site.xml.template    |  11 +
 14 files changed, 2553 insertions(+), 3 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0b83acd..6620bdc 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -242,6 +242,20 @@ DEFINE_bool(invalidate_tables_on_memory_pressure, false, "Configure catalogd to
     "invalidate_table_timeout_s. To enable this feature, a true flag must be applied to "
     "both catalogd and impalad.");
 
+DEFINE_int32(hms_event_polling_interval_s, 0,
+    "Configure catalogd to invalidate cached table metadata based on metastore events. "
+    "These metastore events could be generated by external systems like Apache Hive or "
+    "a different Impala cluster using the same Hive metastore server as this one. "
+    "A non-zero value of this flag sets the polling interval of catalogd in seconds to "
+    "fetch new metastore events. A value of zero disables this feature. When enabled, "
+    "this flag has the same effect as \"INVALIDATE METADATA\" statement on the table "
+    "for certain metastore event types. Additionally, in case of events which detect "
+    "creation or removal of objects from metastore, catalogd adds or removes such "
+    "objects from its cached metadata. This feature is independent of time and memory "
+    "based automatic invalidation of tables. Note that this is still an experimental "
+    "feature and not recommended to be deployed on production systems until it is "
+    "made generally available.");
+
 DEFINE_double_hidden(invalidate_tables_gc_old_gen_full_threshold, 0.6, "The threshold "
     "above which CatalogdTableInvalidator would consider the old generation to be almost "
     "full and trigger an invalidation on recently unused tables");
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f78b317..b02c70a 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -70,6 +70,7 @@ DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
 DECLARE_int64(exchg_node_buffer_size_bytes);
 DECLARE_int32(kudu_mutation_buffer_size);
 DECLARE_int32(kudu_error_buffer_size);
+DECLARE_int32(hms_event_polling_interval_s);
 
 namespace impala {
 
@@ -139,6 +140,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
       FLAGS_exchg_node_buffer_size_bytes);
   cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
   cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
+  cfg.__set_hms_event_polling_interval_s(FLAGS_hms_event_polling_interval_s);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index a56b260..fe724c2 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -113,4 +113,6 @@ struct TBackendGflags {
   43: required i32 kudu_mutation_buffer_size
 
   44: required i32 kudu_error_buffer_size
+
+  45: required i32 hms_event_polling_interval_s
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d9aa2a9..d6d375f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -38,10 +38,14 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.events.ExternalEventsProcessor;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor;
+import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -225,6 +229,9 @@ public class CatalogServiceCatalog extends Catalog {
 
   private CatalogdTableInvalidator catalogdTableInvalidator_;
 
+  // Manages the event processing from metastore for issuing invalidates on tables
+  private ExternalEventsProcessor metastoreEventProcessor_;
+
   /**
    * See the gflag definition in be/.../catalog-server.cc for details on these modes.
    */
@@ -281,7 +288,39 @@ public class CatalogServiceCatalog extends Catalog {
         BackendConfig.INSTANCE.getBackendCfg().catalog_topic_mode.toUpperCase());
     catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
         BackendConfig.INSTANCE);
+    metastoreEventProcessor_ = getEventsProcessor();
     Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
+    // start polling for metastore events
+    metastoreEventProcessor_.start();
+  }
+
+  /**
+   * Returns a Metastore event processor object if
+   * <code>BackendConfig#getHMSPollingIntervalInSeconds</code> returns a non-zero
+   *.value of polling interval. Otherwise, returns a no-op events processor. It is
+   * important to fetch the current notification event id at the Catalog service
+   * initialization time so that event processor starts to sync at the event id
+   * corresponding to the catalog start time.
+   */
+  private ExternalEventsProcessor getEventsProcessor() throws ImpalaException {
+    long eventPollingInterval = BackendConfig.INSTANCE.getHMSPollingIntervalInSeconds();
+    if (eventPollingInterval <= 0) {
+      LOG.info(String
+          .format("Metastore event processing is disabled. Event polling interval is %d",
+              eventPollingInterval));
+      return NoOpEventProcessor.getInstance();
+    }
+    try (MetaStoreClient metaStoreClient = getMetaStoreClient()) {
+      CurrentNotificationEventId currentNotificationId =
+          metaStoreClient.getHiveClient().getCurrentNotificationEventId();
+      return MetastoreEventsProcessor.getInstance(
+          this, currentNotificationId.getEventId(), eventPollingInterval);
+    } catch (TException e) {
+      LOG.error("Unable to fetch the current notification event id from metastore."
+          + "Metastore event processing will be disabled.", e);
+      throw new CatalogException(
+          "Fatal error while initializing metastore event processor", e);
+    }
   }
 
   // Timeout for acquiring a table lock
@@ -1162,10 +1201,22 @@ public class CatalogServiceCatalog extends Catalog {
     refreshAuthorization(true, /*catalog objects added*/ new ArrayList<>(),
         /*catalog objects removed*/ new ArrayList<>());
 
+    // Even though we get the current notification event id before stopping the event
+    // processing here there is a small window of time where we could re-process some of
+    // the event ids, if there is external DDL activity on metastore during reset.
+    // Unfortunately, there is no good way to avoid this since HMS does not provide
+    // APIs which can fetch all the tables/databases at a given id. It is OKAY to
+    // re-process some of these events since event processor relies on creationTime of
+    // the objects to uniquely identify tables from create and drop events. In case of
+    // alter events, however it is likely that some tables would be unnecessarily
+    // invalidated. That would happen when during reset, there were external alter events
+    // and by the time we processed them, Catalog had already loaded them.
+    long currentEventId = metastoreEventProcessor_.getCurrentEventId();
+    // stop the event processing since the cache is anyways being cleared
+    metastoreEventProcessor_.stop();
     // Update the HDFS cache pools
     CachePoolReader reader = new CachePoolReader(true);
     reader.run();
-
     versionLock_.writeLock().lock();
     // In case of an empty new catalog, the version should still change to reflect the
     // reset operation itself and to unblock impalads by making the catalog version >
@@ -1217,6 +1268,8 @@ public class CatalogServiceCatalog extends Catalog {
       throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
     } finally {
       versionLock_.writeLock().unlock();
+      // restart the event processing for id just before the reset
+      metastoreEventProcessor_.start(currentEventId);
     }
     LOG.info("Invalidated all metadata.");
     return currentCatalogVersion;
@@ -1239,6 +1292,25 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Adds a database name to the metadata cache if not exists and returns the
+   * true is a new Db Object was added. Used by MetastoreEventProcessor to handle
+   * CREATE_DATABASE events
+   */
+  public boolean addDbIfNotExists(
+      String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(dbName);
+      if (db == null) {
+        return addDb(dbName, msDb) != null;
+      }
+      return false;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Removes a database from the metadata cache and returns the removed database,
    * or null if the database did not exist in the cache.
    * Used by DROP DATABASE statements.
@@ -1278,6 +1350,31 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Adds table with the given db and table name to the catalog if it does not exists.
+   * @return true if the table was successfully added and false if the table already
+   * exists
+   * @throws CatalogException if the db is not found
+   */
+  public boolean addTableIfNotExists(String dbName, String tblName)
+      throws CatalogException {
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(dbName);
+      if (db == null) {
+        throw new CatalogException(String.format("Db %s does not exist", dbName));
+      }
+      Table existingTable = db.getTable(tblName);
+      if (existingTable != null) return false;
+      Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
+      incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+      db.addTable(incompleteTable);
+      return true;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Adds a table with the given name to the catalog and returns the new table,
    * loading the metadata if needed.
    */
@@ -1359,6 +1456,50 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Remove a catalog table based on the given metastore table if it exists and its
+   * createTime matches with the metastore table
+   *
+   * @param msTable Metastore table to be used to remove Table
+   * @param tblWasfound is set to true if the table was found in the catalog
+   * @param tblMatched is set to true if the table is found and it matched with the
+   * createTime of the cached metastore table in catalog or if the existing table is a
+   * incomplete table
+   * @return Removed table object. Return null if the table was not removed
+   */
+  public Table removeTableIfExists(org.apache.hadoop.hive.metastore.api.Table msTable,
+      Reference<Boolean> tblWasfound, Reference<Boolean> tblMatched) {
+    tblWasfound.setRef(false);
+    tblMatched.setRef(false);
+    // make sure that the createTime of the input table is valid
+    Preconditions.checkState(msTable.getCreateTime() > 0);
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(msTable.getDbName());
+      if (db == null) return null;
+
+      Table tblToBeRemoved = db.getTable(msTable.getTableName());
+      if (tblToBeRemoved == null) return null;
+
+      tblWasfound.setRef(true);
+      // make sure that you are removing the same instance of the table object which
+      // is given by comparing the metastore createTime. In case the found table is a
+      // Incomplete table remove it
+      if (tblToBeRemoved instanceof IncompleteTable
+          || (msTable.getCreateTime()
+                 == tblToBeRemoved.getMetaStoreTable().getCreateTime())) {
+        tblMatched.setRef(true);
+        Table removedTbl = db.removeTable(tblToBeRemoved.getName());
+        removedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(removedTbl.toMinimalTCatalogObject());
+        return removedTbl;
+      }
+      return null;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Removes a table from the catalog and increments the catalog version.
    * Returns the removed Table, or null if the table or db does not exist.
    */
@@ -1462,8 +1603,8 @@ public class CatalogServiceCatalog extends Catalog {
    * 3. T_old, null: Old table was removed but new table was not added.
    * 4. T_old, T_new: Old table was removed and new table was added.
    */
-  public Pair<Table, Table> renameTable(TTableName oldTableName, TTableName newTableName)
-      throws CatalogException {
+  public Pair<Table, Table> renameTable(
+      TTableName oldTableName, TTableName newTableName) {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
     if (db == null) return null;
@@ -1480,6 +1621,36 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Renames the table by atomically removing oldTable and adding the newTable. If the
+   * oldTable is not found this operation becomes a add new table if not exists
+   * operation.
+   *
+   * @return a pair of booleans. The first of the pair is set if the oldTableName was
+   *     found and removed. The second boolean is set if the new table didn't exist before
+   *     and hence was added.
+   */
+  public Pair<Boolean, Boolean> renameOrAddTableIfNotExists(TTableName oldTableName,
+      TTableName newTableName)
+      throws CatalogException {
+    boolean oldTableRemoved = false;
+    boolean newTableAdded;
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(oldTableName.db_name);
+      if (db != null) {
+        // remove the oldTable if it exists
+        oldTableRemoved =
+            removeTable(oldTableName.db_name, oldTableName.table_name) != null;
+      }
+      // add the new tbl if it doesn't exist
+      newTableAdded = addTableIfNotExists(newTableName.db_name, newTableName.table_name);
+      return new Pair<>(oldTableRemoved, newTableAdded);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Reloads metadata for table 'tbl' which must not be an IncompleteTable. Updates the
    * table metadata in-place by calling load() on the given table. Returns the
    * TCatalogObject representing 'tbl'. Applies proper synchronization to protect the
@@ -1650,6 +1821,31 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Invalidate the table if it exists by overwriting existing entry by a Incomplete
+   * Table.
+   * @return null if the table does not exist else return the invalidated table
+   */
+  public Table invalidateTableIfExists(String dbName, String tblName) {
+    Table incompleteTable;
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(dbName);
+      if (db == null) return null;
+      if (!db.containsTable(tblName)) return null;
+      incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
+      incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+      db.addTable(incompleteTable);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+    if (loadInBackground_) {
+      tableLoadingMgr_.backgroundLoad(
+          new TTableName(dbName.toLowerCase(), tblName.toLowerCase()));
+    }
+    return incompleteTable;
+  }
+
+  /**
    * Adds a new role with the given name and grant groups to the AuthorizationPolicy.
    * If a role with the same name already exists it will be overwritten.
    */
@@ -2295,4 +2491,10 @@ public class CatalogServiceCatalog extends Catalog {
   void setCatalogdTableInvalidator(CatalogdTableInvalidator cleaner) {
     catalogdTableInvalidator_ = cleaner;
   }
+
+  @VisibleForTesting
+  public void setMetastoreEventProcessor(
+      ExternalEventsProcessor metastoreEventProcessor) {
+    this.metastoreEventProcessor_ = metastoreEventProcessor;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
new file mode 100644
index 0000000..eda52b9
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * Interface to process external events
+ */
+public interface ExternalEventsProcessor {
+  /**
+   * Start the event processing. This could also be used to initialize the configuration
+   * like polling interval of the event processor
+   */
+  void start();
+
+  /**
+   * Get the current event id on metastore. Useful for restarting the event processing
+   * from a given event id
+   */
+  long getCurrentEventId() throws CatalogException;
+
+  /**
+   * Stop the event processing
+   */
+  void stop();
+
+  /**
+   * Starts the event processing from the given eventId. This method can be used to jump
+   * ahead in the event processing under certain cases where it is okay skip certain
+   * events
+   */
+  void start(long fromEventId);
+
+  /**
+   * Implements the core logic of processing external events
+   */
+  void processEvents();
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventUtils.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventUtils.java
new file mode 100644
index 0000000..aba698e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventUtils.java
@@ -0,0 +1,697 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
+import org.apache.impala.thrift.TTableName;
+import org.apache.log4j.Logger;
+
+/**
+ * Util class which provides Metastore event objects for various event types. Also
+ * provides a MetastoreEventFactory to get or create the event instances for a given event
+ * type
+ */
+public class MetastoreEventUtils {
+
+  public enum MetastoreEventType {
+    CREATE_TABLE("CREATE_TABLE"),
+    DROP_TABLE("DROP_TABLE"),
+    ALTER_TABLE("ALTER_TABLE"),
+    CREATE_DATABASE("CREATE_DATABASE"),
+    DROP_DATABASE("DROP_DATABASE"),
+    ALTER_DATABASE("ALTER_DATABASE"),
+    ADD_PARTITION("ADD_PARTITION"),
+    ALTER_PARTITION("ALTER_PARTITION"),
+    DROP_PARTITION("DROP_PARTITION"),
+    OTHER("OTHER");
+
+    private final String eventType_;
+
+    MetastoreEventType(String msEventType) {
+      this.eventType_ = msEventType;
+    }
+
+    @Override
+    public String toString() {
+      return eventType_;
+    }
+
+    /**
+     * Returns the MetastoreEventType from a given string value of event from Metastore's
+     * NotificationEvent.eventType. If none of the supported MetastoreEventTypes match,
+     * return OTHER
+     *
+     * @param eventType EventType value from the <code>NotificationEvent</code>
+     */
+    public static MetastoreEventType from(String eventType) {
+      for (MetastoreEventType metastoreEventType : values()) {
+        if (metastoreEventType.eventType_.equalsIgnoreCase(eventType)) {
+          return metastoreEventType;
+        }
+      }
+      return OTHER;
+    }
+  }
+
+  /**
+   * Factory class to create various MetastoreEvents.
+   */
+  public static class MetastoreEventFactory {
+    private static final Logger LOG = Logger.getLogger(MetastoreEventFactory.class);
+
+    // catalog service instance to be used for creating eventHandlers
+    private final CatalogServiceCatalog catalog_;
+
+    public MetastoreEventFactory(CatalogServiceCatalog catalog) {
+      this.catalog_ = Preconditions.checkNotNull(catalog);
+    }
+
+    /**
+     * creates instance of <code>MetastoreEvent</code> used to process a given
+     * event type. If the event type is unknown, returns a IgnoredEvent
+     */
+    private MetastoreEvent get(NotificationEvent event)
+        throws MetastoreNotificationException {
+      Preconditions.checkNotNull(event.getEventType());
+      MetastoreEventType metastoreEventType =
+          MetastoreEventType.from(event.getEventType());
+      switch (metastoreEventType) {
+        case CREATE_TABLE:
+          return new CreateTableEvent(catalog_, event);
+        case DROP_TABLE:
+          return new DropTableEvent(catalog_, event);
+        case ALTER_TABLE:
+          return new AlterTableEvent(catalog_, event);
+        case CREATE_DATABASE:
+          return new CreateDatabaseEvent(catalog_, event);
+        case DROP_DATABASE:
+          return new DropDatabaseEvent(catalog_, event);
+        case ALTER_DATABASE:
+          // alter database events are currently ignored
+          return new IgnoredEvent(catalog_, event);
+        case ADD_PARTITION:
+          // add partition events triggers invalidate table currently
+          return new TableInvalidatingEvent(catalog_, event);
+        case DROP_PARTITION:
+          // drop partition events triggers invalidate table currently
+          return new TableInvalidatingEvent(catalog_, event);
+        case ALTER_PARTITION:
+          // alter partition events triggers invalidate table currently
+          return new TableInvalidatingEvent(catalog_, event);
+        default:
+          // ignore all the unknown events by creating a IgnoredEvent
+          return new IgnoredEvent(catalog_, event);
+      }
+    }
+
+    /**
+     * Given a list of notification events, returns a list of <code>MetastoreEvent</code>
+     * In case there are create events which are followed by drop events for the same
+     * object, the create events are filtered out. The drop events do not need to be
+     * filtered out
+     *
+     * This is needed to avoid the replay problem. For example, if catalog created and
+     * removed a table, the create event received will try to add the object again.
+     * This table will be visible until the drop table event is processed. This can be
+     * avoided by "looking ahead" in the event stream to see if the table with the same
+     * name was dropped. In such a case, the create event can be ignored
+     *
+     * @param events NotificationEvents fetched from metastore
+     * @return A list of MetastoreEvents corresponding to the given the NotificationEvents
+     * @throws MetastoreNotificationException if a NotificationEvent could not be
+     * parsed into MetastoreEvent
+     */
+    List<MetastoreEvent> getFilteredEvents(List<NotificationEvent> events)
+        throws MetastoreNotificationException {
+      Preconditions.checkNotNull(events);
+      List<MetastoreEvent> metastoreEvents = new ArrayList<>(events.size());
+      for (NotificationEvent event : events) {
+        metastoreEvents.add(get(event));
+      }
+      Iterator<MetastoreEvent> it = metastoreEvents.iterator();
+      // filter out the create events which has a corresponding drop event later
+      int fromIndex = 0;
+      int numFilteredEvents = 0;
+      int inputSize = metastoreEvents.size();
+      while (it.hasNext()) {
+        MetastoreEvent current = it.next();
+        if (fromIndex < metastoreEvents.size() && current.isRemovedAfter(
+            metastoreEvents.subList(fromIndex + 1, metastoreEvents.size()))) {
+          LOG.info(current.debugString("Filtering out this event since the object is "
+              + "either removed or renamed later in the event stream"));
+          it.remove();
+          numFilteredEvents++;
+        }
+        fromIndex++;
+      }
+      LOG.info(String.format("Total number of events received: %d Total number of events "
+          + "filtered out: %d", inputSize, numFilteredEvents));
+      return metastoreEvents;
+    }
+  }
+
+  /**
+   * Abstract base class for all MetastoreEvents. A MetastoreEvent is a object used to
+   * process a NotificationEvent received from metastore. It is self-contained with all
+   * the information needed to take action on catalog based on a the given
+   * NotificationEvent
+   */
+  public static abstract class MetastoreEvent {
+
+    // CatalogServiceCatalog instance on which the event needs to be acted upon
+    protected final CatalogServiceCatalog catalog_;
+
+    // the notification received from metastore which is processed by this
+    protected final NotificationEvent event_;
+
+    // Logger available for all the sub-classes
+    protected final Logger LOG = Logger.getLogger(this.getClass());
+
+    // dbName from the event
+    protected final String dbName_;
+
+    // eventId of the event. Used instead of calling getter on event_ everytime
+    protected final long eventId_;
+
+    // eventType from the NotificationEvent
+    protected final MetastoreEventType eventType_;
+
+    protected final NotificationEvent metastoreNotificationEvent_;
+
+    MetastoreEvent(CatalogServiceCatalog catalogServiceCatalog, NotificationEvent event) {
+      this.catalog_ = catalogServiceCatalog;
+      this.event_ = event;
+      this.eventId_ = event_.getEventId();
+      this.eventType_ = MetastoreEventType.from(event.getEventType());
+      LOG.debug(String
+          .format("Creating event %d of type %s on table %s", event.getEventId(),
+              event.getEventType(), event.getTableName()));
+      dbName_ = Preconditions.checkNotNull(event.getDbName());
+      metastoreNotificationEvent_ = event;
+    }
+
+    /**
+     * Process the information available in the NotificationEvent to take appropriate
+     * action on Catalog
+     *
+     * @throws MetastoreNotificationException in case of event parsing errors out
+     * @throws CatalogException in case catalog operations could not be performed
+     */
+    abstract void process() throws MetastoreNotificationException, CatalogException;
+
+    /**
+     * Helper method to get debug string with helpful event information prepended to the
+     * message
+     *
+     * @param msgFormatString String value to be used in String.format() for the given
+     *     message
+     * @param args args to the <code>String.format()</code> for the given
+     *     msgFormatString
+     */
+    protected String debugString(String msgFormatString, Object... args) {
+      String formatString =
+          new StringBuilder("EventId: %d EventType: %s ").append(msgFormatString)
+              .toString();
+      Object[] formatArgs = new Object[args.length + 2];
+      formatArgs[0] = eventId_;
+      formatArgs[1] = eventType_;
+      int i=2;
+      for (Object arg : args) {
+        formatArgs[i] = arg;
+        i++;
+      }
+      return String.format(formatString, formatArgs);
+    }
+
+    /**
+     * Search for a inverse event (for example drop_table is a inverse event for
+     * create_table) for this event from a given list of notificationEvents starting
+     * for the startIndex. This is useful for skipping certain events from processing
+     *
+     * @param events List of NotificationEvents to be searched
+     * @return true if the object is removed after this event, else false
+     */
+    protected boolean isRemovedAfter(List<MetastoreEvent> events) {
+      return false;
+    }
+  }
+
+  /**
+   * Base class for all the table events
+   */
+  public static abstract class MetastoreTableEvent extends MetastoreEvent {
+
+    // tblName from the event
+    protected final String tblName_;
+
+    private MetastoreTableEvent(CatalogServiceCatalog catalogServiceCatalog,
+        NotificationEvent event) {
+      super(catalogServiceCatalog, event);
+      tblName_ = Preconditions.checkNotNull(event.getTableName());
+    }
+
+
+    /**
+     * Util method to return the fully qualified table name which is of the format
+     * dbName.tblName for this event
+     */
+    protected String getFullyQualifiedTblName() {
+      return new TableName(dbName_, tblName_).toString();
+    }
+
+    /**
+     * Util method to issue invalidate on a given table on the catalog. This method
+     * atomically invalidates the table if it exists in the catalog. No-op if the table
+     * does not exist
+     */
+    protected boolean invalidateCatalogTable() {
+      return catalog_.invalidateTableIfExists(dbName_, tblName_) != null;
+    }
+  }
+
+  /**
+   * Base class for all the database events
+   */
+  public static abstract class MetastoreDatabaseEvent extends MetastoreEvent {
+
+    MetastoreDatabaseEvent(CatalogServiceCatalog catalogServiceCatalog,
+        NotificationEvent event) {
+      super(catalogServiceCatalog, event);
+    }
+  }
+
+  /**
+   * MetastoreEvent for CREATE_TABLE event type
+   */
+  private static class CreateTableEvent extends MetastoreTableEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private CreateTableEvent(CatalogServiceCatalog catalog, NotificationEvent event) {
+      super(catalog, event);
+      Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(eventType_));
+    }
+
+    /**
+     * If the table provided in the catalog does not exist in the catalog, this method
+     * will create it. If the table in the catalog already exists, it relies of the
+     * creationTime of the Metastore Table to resolve the conflict. If the catalog table's
+     * creation time is less than creationTime of the table from the event, it will be
+     * overridden. Else, it will ignore the event
+     */
+    @Override
+    public void process() throws MetastoreNotificationException {
+      // check if the table exists already. This could happen in corner cases of the
+      // table being dropped and recreated with the same name or in case this event is
+      // a self-event (see description of self-event in the class documentation of
+      // MetastoreEventsProcessor)
+      boolean tableAdded;
+      try {
+        tableAdded = catalog_.addTableIfNotExists(dbName_, tblName_);
+      } catch (CatalogException e) {
+        // if a exception is thrown, it could be due to the fact that the db did not
+        // exist in the catalog cache. This could only happen if the previous
+        // create_database event for this table errored out
+        throw new MetastoreNotificationException(debugString(
+            "Unable to add table while processing for table %s because the "
+                + "database doesn't exist. This could be due to a previous error while "
+                + "processing CREATE_DATABASE event for the database %s",
+            getFullyQualifiedTblName(), dbName_), e);
+      }
+      if (!tableAdded) {
+        LOG.debug(
+            debugString("Not adding the table %s since it already exists in catalog",
+                tblName_));
+        return;
+      }
+      LOG.info(debugString("Added a table %s", getFullyQualifiedTblName()));
+    }
+
+    @Override
+    public boolean isRemovedAfter(List<MetastoreEvent> events) {
+      Preconditions.checkNotNull(events);
+      for (MetastoreEvent event : events) {
+        if (event.eventType_.equals(MetastoreEventType.DROP_TABLE)) {
+          DropTableEvent dropTableEvent = (DropTableEvent) event;
+          if (dbName_.equalsIgnoreCase(dropTableEvent.dbName_) && tblName_
+              .equalsIgnoreCase(dropTableEvent.tblName_)) {
+            LOG.info(debugString("Found table %s is removed later in event %d type %s",
+                tblName_, dropTableEvent.eventId_, dropTableEvent.eventType_));
+            return true;
+          }
+        } else if (event.eventType_.equals(MetastoreEventType.ALTER_TABLE)) {
+          // renames are implemented as a atomic (drop+create) so rename events can
+          // also be treated as a inverse event of the create_table event. Consider a
+          // DDL op sequence like create table, alter table rename from impala. Since
+          // the rename operation is internally implemented as drop+add, processing a
+          // create table event on this cluster will show up the table for small window
+          // of time, until the actual rename event is processed. If however, we ignore
+          // the create table event, the alter rename event just becomes a addIfNotExists
+          // event which is valid for both a self-event and external event cases
+          AlterTableEvent alterTableEvent = (AlterTableEvent) event;
+          if (alterTableEvent.isRename_ && dbName_
+              .equalsIgnoreCase(alterTableEvent.tableBefore_.getDbName()) && tblName_
+              .equalsIgnoreCase(alterTableEvent.tableBefore_.getTableName())) {
+            LOG.info(debugString("Found table %s is renamed later in event %d type %s",
+                tblName_, alterTableEvent.eventId_, alterTableEvent.eventType_));
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * MetastoreEvent for ALTER_TABLE event type
+   */
+  private static class AlterTableEvent extends MetastoreTableEvent {
+
+    // the table object before alter operation, as parsed from the NotificationEvent
+    private final org.apache.hadoop.hive.metastore.api.Table tableBefore_;
+    // the table object after alter operation, as parsed from the NotificationEvent
+    private final org.apache.hadoop.hive.metastore.api.Table tableAfter_;
+    // true if this alter event was due to a rename operation
+    private final boolean isRename_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private AlterTableEvent(CatalogServiceCatalog catalog, NotificationEvent event)
+        throws MetastoreNotificationException {
+      super(catalog, event);
+      Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(eventType_));
+      JSONAlterTableMessage alterTableMessage =
+          (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getAlterTableMessage(event.getMessage());
+      try {
+        tableBefore_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+        tableAfter_ = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(
+            debugString("Unable to parse the alter table message"), e);
+      }
+      // this is a rename event if either dbName or tblName of before and after object
+      // changed
+      isRename_ = !tableBefore_.getDbName().equalsIgnoreCase(tableAfter_.getDbName())
+          || !tableBefore_.getTableName().equalsIgnoreCase(tableAfter_.getTableName());
+    }
+
+    /**
+     * If the ALTER_TABLE event is due a table rename, this method removes the old table
+     * and creates a new table with the new name. Else, this just issues a invalidate
+     * table on the tblName from the event//TODO Check if we can rename the existing table
+     * in-place
+     */
+    @Override
+    public void process() throws MetastoreNotificationException {
+      // in case of table level alters from external systems it is better to do a full
+      // invalidate  eg. this could be due to as simple as adding a new parameter or a
+      // full blown adding  or changing column type
+      // detect the special where a table is renamed
+      try {
+        if (!isRename_) {
+          // table is not renamed, need to invalidate
+          if (!invalidateCatalogTable()) {
+            LOG.debug(debugString("Table %s does not need to be "
+                    + "invalidated since it does not exist anymore",
+                getFullyQualifiedTblName()));
+          } else {
+            LOG.info(debugString("Table %s is invalidated", getFullyQualifiedTblName()));
+          }
+          return;
+        }
+        // table was renamed, remove the old table
+        LOG.info(debugString("Found that %s table was renamed. Renaming it by "
+                + "remove and adding a new table", new TableName(tableBefore_.getDbName(),
+            tableBefore_.getTableName())));
+        TTableName oldTTableName =
+            new TTableName(tableBefore_.getDbName(), tableBefore_.getTableName());
+        TTableName newTTableName =
+            new TTableName(tableAfter_.getDbName(), tableAfter_.getTableName());
+
+        // atomically rename the old table to new table
+        Pair<Boolean, Boolean> result =
+            catalog_.renameOrAddTableIfNotExists(oldTTableName, newTTableName);
+
+        // old table was not found. This could be because catalogD is stale and didn't
+        // have any entry for the oldTable
+        if (!result.first) {
+          LOG.debug(debugString("Did not remove old table to rename table %s to %s since "
+                  + "it does not exist anymore", qualify(oldTTableName),
+              qualify(newTTableName)));
+        }
+        // the new table from the event was not added since it was already present
+        if (!result.second) {
+          LOG.debug(
+              debugString("Did not add new table name while renaming table %s to %s",
+                  qualify(oldTTableName), qualify(newTTableName)));
+        }
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(e);
+      }
+    }
+
+    private String qualify(TTableName tTableName) {
+      return new TableName(tTableName.db_name, tTableName.table_name).toString();
+    }
+  }
+
+  /**
+   * MetastoreEvent for the DROP_TABLE event type
+   */
+  private static class DropTableEvent extends MetastoreTableEvent {
+
+    // the metastore table object as parsed from the drop table event
+    private final org.apache.hadoop.hive.metastore.api.Table droppedTable_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private DropTableEvent(CatalogServiceCatalog catalog, NotificationEvent event)
+        throws MetastoreNotificationException {
+      super(catalog, event);
+      Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(eventType_));
+      JSONDropTableMessage dropTableMessage =
+          (JSONDropTableMessage) MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getDropTableMessage(event.getMessage());
+      try {
+        droppedTable_ = Preconditions.checkNotNull(dropTableMessage.getTableObj());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString(
+            "Could not parse event message. "
+                + "Check if %s is set to true in metastore configuration",
+            MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+      }
+    }
+
+    /**
+     * Process the drop table event type. If the table from the event doesn't exist in the
+     * catalog, ignore the event. If the table exists in the catalog, compares the
+     * createTime of the table in catalog with the createTime of the table from the event
+     * and remove the catalog table if there is a match. If the catalog table is a
+     * incomplete table it is removed as well.
+     */
+    @Override
+    public void process() {
+      Reference<Boolean> tblWasFound = new Reference<>();
+      Reference<Boolean> tblMatched = new Reference<>();
+      Table removedTable =
+          catalog_.removeTableIfExists(droppedTable_, tblWasFound, tblMatched);
+      if (removedTable != null) {
+        LOG.info(debugString("Removed table %s ", getFullyQualifiedTblName()));
+      } else if (!tblMatched.getRef()) {
+        LOG.warn(debugString("Table %s was not removed from "
+            + "catalog since the creation time of the table did not match", tblName_));
+      } else if (!tblWasFound.getRef()) {
+        LOG.debug(
+            debugString("Table %s was not removed since it did not exist in catalog.",
+                tblName_));
+      }
+    }
+  }
+
+  /**
+   * MetastoreEvent for CREATE_DATABASE event type
+   */
+  private static class CreateDatabaseEvent extends MetastoreDatabaseEvent {
+
+    // metastore database object as parsed from NotificationEvent message
+    private final Database createdDatabase_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private CreateDatabaseEvent(CatalogServiceCatalog catalog, NotificationEvent event)
+        throws MetastoreNotificationException {
+      super(catalog, event);
+      Preconditions.checkArgument(MetastoreEventType.CREATE_DATABASE.equals(eventType_));
+      JSONCreateDatabaseMessage createDatabaseMessage =
+          (JSONCreateDatabaseMessage) MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getCreateDatabaseMessage(event.getMessage());
+      try {
+        createdDatabase_ =
+            Preconditions.checkNotNull(createDatabaseMessage.getDatabaseObject());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString(
+            "Database object is null in the event. "
+                + "This could be a metastore configuration problem. "
+                + "Check if %s is set to true in metastore configuration",
+            MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+      }
+    }
+
+    /**
+     * Processes the create database event by adding the Db object from the event if it
+     * does not exist in the catalog already. TODO we should compare the creationTime of
+     * the Database in catalog with the Database in the event to make sure we are ignoring
+     * only catalog has the latest Database object. This will be added after HIVE-21077 is
+     * fixed and available
+     */
+    @Override
+    public void process() {
+      // if the database already exists in catalog, by definition, it is a later version
+      // of the database since metastore will not allow it be created if it was already
+      // existing at the time of creation. In such case, it is safe to assume that the
+      // already existing database in catalog is a later version with the same name and
+      // this event can be ignored
+      if (catalog_.addDbIfNotExists(dbName_, createdDatabase_)) {
+        LOG.info(debugString("Successfully added database %s", dbName_));
+      } else {
+        LOG.info(debugString("Database %s already exists", dbName_));
+      }
+    }
+
+    @Override
+    public boolean isRemovedAfter(List<MetastoreEvent> events) {
+      Preconditions.checkNotNull(events);
+      for (MetastoreEvent event : events) {
+        if (event.eventType_.equals(MetastoreEventType.DROP_DATABASE)) {
+          DropDatabaseEvent dropDatabaseEvent = (DropDatabaseEvent) event;
+          if (dbName_.equalsIgnoreCase(dropDatabaseEvent.dbName_)) {
+            LOG.info(debugString(
+                "Found database %s is removed later in event %d of " + "type %s ",
+                dbName_, dropDatabaseEvent.eventId_, dropDatabaseEvent.eventType_));
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * MetastoreEvent for the DROP_DATABASE event
+   */
+  private static class DropDatabaseEvent extends MetastoreDatabaseEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private DropDatabaseEvent(CatalogServiceCatalog catalog, NotificationEvent event) {
+      super(catalog, event);
+    }
+
+    /**
+     * Process the drop database event. Currently, this handler removes the db object from
+     * catalog. TODO Once we have HIVE-21077 we should compare creationTime to make sure
+     * that catalog's Db matches with the database object in the event
+     */
+    @Override
+    public void process() {
+      // TODO this does not currently handle the case where the was a new instance
+      // of database with the same name created in catalog after this database instance
+      // was removed. For instance, user does a CREATE db, drop db and create db again
+      // with the same dbName. In this case, the drop database event will remove the
+      // database instance which is created after this create event. We should add a
+      // check to compare the creation time of the database with the creation time in
+      // the event to make sure we are removing the right databases object. Unfortunately,
+      // database do not have creation time currently. This would be fixed in HIVE-21077
+      Db removedDb = catalog_.removeDb(dbName_);
+      // if database did not exist in the cache there was nothing to do
+      if (removedDb != null) {
+        LOG.info(debugString("Successfully removed database %s", dbName_));
+      }
+    }
+  }
+
+  /**
+   * MetastoreEvent for which issues invalidate on a table from the event
+   */
+  private static class TableInvalidatingEvent extends MetastoreTableEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private TableInvalidatingEvent(CatalogServiceCatalog catalog,
+        NotificationEvent event) {
+      super(catalog, event);
+    }
+
+    /**
+     * Issues a invalidate table on the catalog on the table from the event. This
+     * invalidate does not fetch information from metastore unlike the invalidate metadata
+     * command since event is triggered post-metastore activity. This handler invalidates
+     * by atomically removing existing loaded table and replacing it with a
+     * IncompleteTable. If the table doesn't exist in catalog this operation is a no-op
+     */
+    @Override
+    public void process() {
+      if (invalidateCatalogTable()) {
+        LOG.info(debugString("Table %s is invalidated", getFullyQualifiedTblName()));
+      } else {
+        LOG.debug(debugString("Table %s does not need to be invalidated since "
+            + "it does not exist anymore", getFullyQualifiedTblName()));
+      }
+    }
+  }
+
+  /**
+   * An event type which is ignored. Useful for unsupported metastore event types
+   */
+  private static class IgnoredEvent extends MetastoreEvent {
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private IgnoredEvent(CatalogServiceCatalog catalog, NotificationEvent event) {
+      super(catalog, event);
+    }
+
+    @Override
+    public void process() {
+      LOG.debug(debugString("Ignored"));
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
new file mode 100644
index 0000000..cca29bb
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.ExtendedJSONMessageFactory;
+import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.META;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEvent;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEventFactory;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+/**
+ * A metastore event is a instance of the class
+ * <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. Metastore can be
+ * configured, to work with Listeners which are called on various DDL operations like
+ * create/alter/drop operations on database, table, partition etc. Each event has a unique
+ * incremental id and the generated events are be fetched from Metastore to get
+ * incremental updates to the metadata stored in Hive metastore using the the public API
+ * <code>get_next_notification</code> These events could be generated by external
+ * Metastore clients like Apache Hive or Apache Spark as well as other Impala clusters
+ * configured to talk with the same metastore.
+ *
+ * This class is used to poll metastore for such events at a given frequency. By observing
+ * such events, we can take appropriate action on the catalogD (invalidate/add/remove) so
+ * that catalog represents the latest information available in metastore. We keep track of
+ * the last synced event id in each polling iteration so the next batch can be requested
+ * appropriately. The current batch size is constant and set to MAX_EVENTS_PER_RPC.
+ *
+ * <pre>
+ *      +---------------+   +----------------+        +--------------+
+ *      |Catalog state  |   |Catalog         |        |              |
+ *      |stale          |   |State up-to-date|        |Catalog state |
+ *      |(apply event)  |   |(ignore)        |        |is newer than |
+ *      |               |   |                |        |event         |
+ *      |               |   |                |        |(ignore)      |
+ *      +------+--------+   +-----+----------+        +-+------------+
+ *             |                  |                     |
+ *             |                  |                     |
+ *             |                  |                     |
+ *             |                  |                     |
+ *             |                  |                     |
+ * +-----------V------------------V---------------------V----------->  Event Timeline
+ *                                ^
+ *                                |
+ *                                |
+ *                                |
+ *                                |
+ *                                E
+ *
+ * </pre>
+ * Consistency model: Events could be seen as DDLs operations from past either done from
+ * this same cluster or some other external system. For example in the events timeline
+ * given above, consider a Event E at any given time. The catalog state for the
+ * corresponding object of the event could either be stale, exactly-same or at a version
+ * which is higher than one provided by event. Catalog state should only be updated when
+ * it is stale with respect to the event. In order to determine if the catalog object is
+ * stale, we rely on a combination of creationTime and object version. A object in catalog
+ * is stale if and only if its creationTime is < creationTime of the object from event OR
+ * its version < version from event if createTime matches
+ *
+ * If the object has the same createTime and version when compared to event or if the
+ * createTime > createTime from the event, the event can be safely ignored.
+ *
+ * Following table shows the actions to be taken when the catalog state is stale.
+ *
+ * <pre>
+ *               +----------------------------------------+
+ *               |    Catalog object state                |
+ * +----------------------------+------------+------------+
+ * | Event type  | Loaded       | Incomplete | Not present|
+ * |             |              |            |            |
+ * +------------------------------------------------------+
+ * |             |              |            |            |
+ * | CREATE EVENT| removeAndAdd | Ignore     | Add        |
+ * |             |              |            |            |
+ * |             |              |            |            |
+ * | ALTER EVENT | Invalidate   | Ignore     | Ignore     |
+ * |             |              |            |            |
+ * |             |              |            |            |
+ * | DROP EVENT  | Remove       | Remove     | Ignore     |
+ * |             |              |            |            |
+ * +-------------+--------------+------------+------------+
+ * </pre>
+ *
+ * //TODO - Object version support is a work-in-progress in Hive (HIVE-21115). Current
+ * event handlers only rely on createTime on Table and Partition. Database createTime is a
+ * work-in-progress in Hive in (HIVE-20776)
+ *
+ * All the operations which change the state of catalog cache while processing a certain
+ * event type must be atomic in nature. We rely on taking a write lock on version object
+ * in CatalogServiceCatalog to make sure that readers are blocked while the metadata
+ * update operation is being performed. Since the events are generated post-metastore
+ * operations, such catalog updates do not need to update the state in Hive Metastore.
+ *
+ * Error Handling: The event processor could be in ACTIVE, STOPPED, ERROR states. In case
+ * of any errors while processing the events the state of event processor changes to ERROR
+ * and no subsequent events are polled. In such a case a invalidate metadata command
+ * restarts the event polling which updates the lastSyncedEventId to the latest from
+ * metastore.
+ */
+public class MetastoreEventsProcessor implements ExternalEventsProcessor {
+
+  public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
+      "hive.metastore.notifications.add.thrift.objects";
+  private static final Logger LOG = Logger.getLogger(MetastoreEventsProcessor.class);
+  // Use ExtendedJSONMessageFactory to deserialize the event messages.
+  // ExtendedJSONMessageFactory adds additional information over JSONMessageFactory so
+  // that events are compatible with Sentry
+  // TODO this should be moved to JSONMessageFactory when Sentry switches to
+  // JSONMessageFactory
+  private static final MessageFactory messageFactory =
+      ExtendedJSONMessageFactory.getInstance();
+
+  private static MetastoreEventsProcessor instance;
+
+  // maximum number of events to poll in each RPC
+  private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;
+
+  // possible status of event processor
+  public enum EventProcessorStatus {
+    STOPPED, // event processor is instantiated but not yet scheduled
+    ACTIVE, // event processor is scheduled at a given frequency
+    ERROR // event processor is in error state and event processing has stopped
+  }
+
+  // current status of this event processor
+  private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED;
+
+  // event factory which is used to get or create MetastoreEvents
+  private final MetastoreEventFactory metastoreEventFactory_;
+
+  // keeps track of the last event id which we have synced to
+  private long lastSyncedEventId_;
+
+  // polling interval in seconds. Note this is a time we wait AFTER each fetch call
+  private final long pollingFrequencyInSec_;
+
+  // catalog service instance to be used while processing events
+  private final CatalogServiceCatalog catalog_;
+
+  // scheduler daemon thread executor for processing events at given frequency
+  private final ScheduledExecutorService scheduler_ = Executors
+      .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("MetastoreEventsProcessor").build());
+
+  @VisibleForTesting
+  MetastoreEventsProcessor(CatalogServiceCatalog catalog, long startSyncFromId,
+      long pollingFrequencyInSec) {
+    Preconditions.checkState(pollingFrequencyInSec > 0);
+    this.catalog_ = Preconditions.checkNotNull(catalog);
+    lastSyncedEventId_ = startSyncFromId;
+    metastoreEventFactory_ = new MetastoreEventFactory(catalog_);
+    pollingFrequencyInSec_ = pollingFrequencyInSec;
+  }
+
+  /**
+   * Schedules the daemon thread at a given frequency. It is important to note that this
+   * method schedules with FixedDelay instead of FixedRate. The reason it is scheduled at
+   * a fixedDelay is to make sure that we don't pile up the pending tasks in case each
+   * polling operation is taking longer than the given frequency. Because of the fixed
+   * delay, the new poll operation is scheduled at the time when previousPoll operation
+   * completes + givenDelayInSec
+   */
+  @Override
+  public synchronized void start() {
+    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
+    startScheduler();
+    eventProcessorStatus_ = EventProcessorStatus.ACTIVE;
+    LOG.info(String.format("Successfully started metastore event processing."
+        + "Polling interval: %d seconds.", pollingFrequencyInSec_));
+  }
+
+  /**
+   * Gets the current event processor status
+   */
+  @VisibleForTesting
+  EventProcessorStatus getStatus() {
+    return eventProcessorStatus_;
+  }
+
+  /**
+   * returns the current value of LastSyncedEventId. This method is not thread-safe and
+   * only to be used for testing purposes
+   */
+  @VisibleForTesting
+  public long getLastSyncedEventId() {
+    return lastSyncedEventId_;
+  }
+
+  @VisibleForTesting
+  void startScheduler() {
+    Preconditions.checkState(pollingFrequencyInSec_ > 0);
+    LOG.info(String.format("Starting metastore event polling with interval %d seconds.",
+        pollingFrequencyInSec_));
+    scheduler_.scheduleWithFixedDelay(this::processEvents, pollingFrequencyInSec_,
+        pollingFrequencyInSec_, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Stops the event processing and changes the status of event processor to
+   * <code>EventProcessorStatus.STOPPED</code>. No new events will be processed as long
+   * the status is stopped. If this event processor is actively processing events when
+   * stop is called, this method blocks until the current processing is complete
+   */
+  @Override
+  public synchronized void stop() {
+    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.STOPPED);
+    eventProcessorStatus_ = EventProcessorStatus.STOPPED;
+    LOG.info(String.format("Event processing is stopped. Last synced event id is %d",
+        lastSyncedEventId_));
+  }
+
+  /**
+   * Get the current notification event id from metastore
+   */
+  @Override
+  public long getCurrentEventId() throws CatalogException {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+    } catch (TException e) {
+      throw new CatalogException("Unable to fetch the current notification event id. "
+          + "Check if metastore service is accessible");
+    }
+  }
+
+  /**
+   * Starts the event processor from a given event id
+   */
+  @Override
+  public synchronized void start(long fromEventId) {
+    Preconditions.checkArgument(fromEventId >= 0);
+    Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE,
+        "Event processing start called when it is already active");
+    long prevLastSyncedEventId = lastSyncedEventId_;
+    lastSyncedEventId_ = fromEventId;
+    eventProcessorStatus_ = EventProcessorStatus.ACTIVE;
+    LOG.info(String.format(
+        "Metastore event processing restarted. Last synced event id was updated "
+            + "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_));
+  }
+
+  /**
+   * Fetch the next batch of NotificationEvents from metastore. The default batch size if
+   * <code>EVENTS_BATCH_SIZE_PER_RPC</code>
+   */
+  @VisibleForTesting
+  protected List<NotificationEvent> getNextMetastoreEvents()
+      throws MetastoreNotificationFetchException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      // fetch the current notification event id. We assume that the polling interval
+      // is small enough that most of these polling operations result in zero new
+      // events. In such a case, fetching current notification event id is much faster
+      // (and cheaper on HMS side) instead of polling for events directly
+      CurrentNotificationEventId currentNotificationEventId =
+          msClient.getHiveClient().getCurrentNotificationEventId();
+      long currentEventId = currentNotificationEventId.getEventId();
+
+      // no new events since we last polled
+      if (currentEventId <= lastSyncedEventId_) {
+        return Collections.emptyList();
+      }
+
+      NotificationEventResponse response = msClient.getHiveClient()
+          .getNextNotification(lastSyncedEventId_, EVENTS_BATCH_SIZE_PER_RPC, null);
+      LOG.info(String
+          .format("Received %d events. Start event id : %d", response.getEvents().size(),
+              lastSyncedEventId_));
+      return response.getEvents();
+    } catch (TException e) {
+      throw new MetastoreNotificationFetchException(
+          "Unable to fetch notifications from metastore. Last synced event id is "
+              + lastSyncedEventId_, e);
+    }
+  }
+
+  /**
+   * This method issues a request to Hive Metastore if needed, based on the current event
+   * id in metastore and the last synced event_id. Events are fetched in fixed sized
+   * batches. Each NotificationEvent received is processed by its corresponding
+   * <code>MetastoreEvent</code>
+   */
+  @Override
+  public void processEvents() {
+    NotificationEvent lastProcessedEvent = null;
+    try {
+      EventProcessorStatus currentStatus = eventProcessorStatus_;
+      if (currentStatus == EventProcessorStatus.STOPPED
+          || currentStatus == EventProcessorStatus.ERROR) {
+        LOG.warn(String.format(
+            "Event processing is skipped since status is %s. Last synced event id is %d",
+            currentStatus, lastSyncedEventId_));
+        return;
+      }
+
+      List<NotificationEvent> events = getNextMetastoreEvents();
+      lastProcessedEvent = processEvents(events);
+    } catch (MetastoreNotificationFetchException ex) {
+      updateStatus(EventProcessorStatus.ERROR);
+      LOG.error("Unable to fetch the next batch of metastore events", ex);
+    } catch (MetastoreNotificationException | CatalogException ex) {
+      updateStatus(EventProcessorStatus.ERROR);
+      LOG.error(String.format(
+          "Unable to process notification event %d due to %s. Event processing will be "
+              + "stopped", lastProcessedEvent.getEventId(), ex.getMessage()));
+      dumpEventInfoToLog(lastProcessedEvent);
+    }
+  }
+
+  /**
+   * Process the given list of notification events. Useful for tests which provide a list
+   * of events
+   *
+   * @return the last Notification event which was processed.
+   */
+  @VisibleForTesting
+  protected NotificationEvent processEvents(List<NotificationEvent> events)
+      throws MetastoreNotificationException, CatalogException {
+    List<MetastoreEvent> filteredEvents =
+        metastoreEventFactory_.getFilteredEvents(events);
+    NotificationEvent lastProcessedEvent = null;
+    for (MetastoreEvent event : filteredEvents) {
+      // synchronizing each event processing reduces the scope of the lock so the a
+      // potential reset() during event processing is not blocked for longer than
+      // necessary
+      synchronized (this) {
+        if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
+          break;
+        }
+        lastProcessedEvent = event.metastoreNotificationEvent_;
+        event.process();
+        lastSyncedEventId_ = event.eventId_;
+      }
+    }
+    return lastProcessedEvent;
+  }
+
+  /**
+   * Updates the current states to the given status.
+   */
+  private synchronized void updateStatus(EventProcessorStatus toStatus) {
+    eventProcessorStatus_ = toStatus;
+  }
+
+  private void dumpEventInfoToLog(NotificationEvent event) {
+    StringBuilder msg =
+        new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n")
+            .append("Event Type: ").append(event.getEventType()).append("\n")
+            .append("Event time: ").append(event.getEventTime()).append("\n")
+            .append("Database name: ").append(event.getDbName()).append("\n");
+    if (event.getTableName() != null) {
+      msg.append("Table name: ").append(event.getTableName()).append("\n");
+    }
+    msg.append("Event message: ").append(event.getMessage()).append("\n");
+    LOG.error(msg.toString());
+  }
+
+  /**
+   * Create a instance of this object if it is not initialized. Currently, this object is
+   * a singleton and should only be created during catalogD initialization time, so that
+   * the start syncId matches with the catalogD startup time.
+   *
+   * @param catalog the CatalogServiceCatalog instance to which this event processing
+   *     belongs
+   * @param startSyncFromId Start event id. Events will be polled starting from this
+   *     event id
+   * @param eventPollingInterval HMS polling interval in seconds
+   * @return this object is already created, or create a new one if it is not yet
+   *     instantiated
+   */
+  public static synchronized ExternalEventsProcessor getInstance(
+      CatalogServiceCatalog catalog, long startSyncFromId, long eventPollingInterval) {
+    if (instance != null) {
+      return instance;
+    }
+
+    instance =
+        new MetastoreEventsProcessor(catalog, startSyncFromId, eventPollingInterval);
+    return instance;
+  }
+
+  @VisibleForTesting
+  public MetastoreEventFactory getMetastoreEventFactory() {
+    return metastoreEventFactory_;
+  }
+
+  public static MessageFactory getMessageFactory() {
+    return messageFactory;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationException.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationException.java
new file mode 100644
index 0000000..341e8aa
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import org.apache.impala.common.ImpalaException;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationException extends ImpalaException {
+  private static final long serialVersionUID = -2493154165900437878L;
+
+  public MetastoreNotificationException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public MetastoreNotificationException(String msg) { super(msg); }
+
+  public MetastoreNotificationException(Exception e) { super(e); }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationFetchException.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationFetchException.java
new file mode 100644
index 0000000..d249b20
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreNotificationFetchException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+public class MetastoreNotificationFetchException extends MetastoreNotificationException {
+
+  private static final long serialVersionUID = -2965835338838695815L;
+
+  public MetastoreNotificationFetchException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public MetastoreNotificationFetchException(String msg) {
+    super(msg);
+  }
+
+  public MetastoreNotificationFetchException(Exception e) {
+    super(e);
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
new file mode 100644
index 0000000..44150d1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+/**
+ * A simple no-op events processor which does nothing. Used to plugin to the catalog
+ * when event processing is disabled so that we don't have to do a null check every
+ * time the event processor is called
+ */
+public class NoOpEventProcessor implements ExternalEventsProcessor {
+  private static final ExternalEventsProcessor INSTANCE = new NoOpEventProcessor();
+
+  /**
+   * Gets the instance of NoOpEventProcessor
+   */
+  public static ExternalEventsProcessor getInstance() { return INSTANCE; }
+
+  private NoOpEventProcessor() {
+    // prevents instantiation
+  }
+
+  @Override
+  public void start() {
+    // no-op
+  }
+
+  @Override
+  public long getCurrentEventId() {
+    // dummy event id
+    return -1;
+  }
+
+  @Override
+  public void stop() {
+    // no-op
+  }
+
+  @Override
+  public void start(long fromEventId) {
+    // no-op
+  }
+
+  @Override
+  public void processEvents() {
+    // no-op
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 0e65903..a4a7b72 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -125,6 +125,10 @@ public class BackendConfig {
     return backendCfg_.catalog_partial_fetch_rpc_queue_timeout_s;
   }
 
+  public long getHMSPollingIntervalInSeconds() {
+    return backendCfg_.hms_event_polling_interval_s;
+  }
+
   public boolean isOrcScannerEnabled() {
     return backendCfg_.enable_orc_scanner;
   }
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
new file mode 100644
index 0000000..bcbcd43
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -0,0 +1,977 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.DatabaseNotFoundException;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.IncompleteTable;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEvent;
+import org.apache.impala.catalog.events.MetastoreEventUtils.MetastoreEventType;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.TAlterTableOrViewRenameParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TColumnType;
+import org.apache.impala.thrift.TCreateDbParams;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TDdlType;
+import org.apache.impala.thrift.TDropDbParams;
+import org.apache.impala.thrift.TDropTableOrViewParams;
+import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TPrimitiveType;
+import org.apache.impala.thrift.TScalarType;
+import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TTypeNode;
+import org.apache.impala.thrift.TTypeNodeType;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Main test class to cover the functionality of MetastoreEventProcessor. In order to make
+ * the test deterministic, this test relies on the fact the default value of
+ * hms_event_polling_interval_s is 0. This means that there is no automatic scheduled
+ * frequency of the polling for events from metastore. In order to simulate a poll
+ * operation this test issues the <code>processEvents</code> method
+ * manually to process the pending events. This test relies on a external HMS process
+ * running in a minicluster environment such that events are generated and they have the
+ * thrift objects enabled in the event messages.
+ */
+public class MetastoreEventsProcessorTest {
+  private static final String TEST_TABLE_NAME_PARTITIONED = "test_partitioned_tbl";
+  private static final String TEST_DB_NAME = "events_test_db";
+  private static final String TEST_TABLE_NAME_NONPARTITIONED = "test_nonpartitioned_tbl";
+
+  private static CatalogServiceCatalog catalog_;
+  private static CatalogOpExecutor catalogOpExecutor_;
+  private static MetastoreEventsProcessor eventsProcessor_;
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws TException {
+    catalog_ = CatalogServiceTestCatalog.create();
+    catalogOpExecutor_ = new CatalogOpExecutor(catalog_);
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      CurrentNotificationEventId currentNotificationId =
+          metaStoreClient.getHiveClient().getCurrentNotificationEventId();
+      eventsProcessor_ = new SynchronousHMSEventProcessorForTests(
+          catalog_, currentNotificationId.getEventId(), 10L);
+      eventsProcessor_.start();
+    }
+    catalog_.setMetastoreEventProcessor(eventsProcessor_);
+  }
+
+  @AfterClass
+  public static void tearDownTestSetup() {
+    try {
+      dropDatabaseCascadeFromHMS();
+      // remove database from catalog as well to clean up catalog state
+      catalog_.removeDb(TEST_DB_NAME);
+    } catch (Exception ex) {
+      // ignored
+    }
+  }
+
+  private static void dropDatabaseCascadeFromHMS() throws TException {
+    dropDatabaseCascade(TEST_DB_NAME);
+  }
+
+  private static void dropDatabaseCascade(String dbName) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().dropDatabase(dbName, true, true, true);
+    }
+  }
+
+  /**
+   * Cleans up the test database from both metastore and catalog
+   * @throws TException
+   */
+  @Before
+  public void beforeTest() throws TException, CatalogException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().dropDatabase(TEST_DB_NAME, true, true, true);
+    }
+    catalog_.removeDb(TEST_DB_NAME);
+    // reset the event processor to the current eventId
+    eventsProcessor_.stop();
+    eventsProcessor_.start(eventsProcessor_.getCurrentEventId());
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+  }
+
+  /**
+   * Make sure the eventProcessor is in ACTIVE state after processing all the events in
+   * the test. All tests should make sure that the eventprocessor is returned back to
+   * active state so that next test execution starts clean
+   */
+  @After
+  public void afterTest() {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+  }
+
+  /**
+   * Checks that database exists after processing a CREATE_DATABASE event
+   */
+  @Test
+  public void testCreateDatabaseEvent() throws TException, ImpalaException {
+    createDatabase();
+    eventsProcessor_.processEvents();
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+  }
+
+  /**
+   * Checks that Db object does not exist after processing DROP_DATABASE event when the
+   * dropped database is empty
+   */
+  @Test
+  public void testDropEmptyDatabaseEvent() throws TException, ImpalaException {
+    dropDatabaseCascade("database_to_be_dropped");
+    // create empty database
+    createDatabase("database_to_be_dropped");
+    eventsProcessor_.processEvents();
+    assertNotNull(catalog_.getDb("database_to_be_dropped"));
+    dropDatabaseCascade("database_to_be_dropped");
+    eventsProcessor_.processEvents();
+    assertNull("Database should not be found after processing drop_database event",
+        catalog_.getDb("database_to_be_dropped"));
+  }
+
+  /**
+   * Checks that Db object does not exist after processing DROP_DATABASE event when the
+   * dropped database is not empty. This event could be generated by issuing a DROP
+   * DATABASE .. CASCADE command. In this case since the tables in the database are also
+   * dropped, we expect to see a DatabaseNotFoundException when we query for the tables in
+   * the dropped database.
+   */
+  @Test
+  public void testdropDatabaseEvent() throws TException, ImpalaException {
+    createDatabase();
+    String tblToBeDropped = "tbl_to_be_dropped";
+    createTable(tblToBeDropped, true);
+    createTable("tbl_to_be_dropped_unpartitioned", false);
+    // create 2 partitions
+    List<List<String>> partVals = new ArrayList<>(2);
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    addPartitions(tblToBeDropped, partVals);
+    eventsProcessor_.processEvents();
+    loadTable(tblToBeDropped);
+    // now drop the database with cascade option
+    dropDatabaseCascadeFromHMS();
+    eventsProcessor_.processEvents();
+    assertTrue(
+        "Dropped database should not be found after processing drop_database event",
+        catalog_.getDb(TEST_DB_NAME) == null);
+    // throws DatabaseNotFoundException
+    try {
+      catalog_.getTable(TEST_DB_NAME, tblToBeDropped);
+      fail();
+    } catch (DatabaseNotFoundException expectedEx) {
+      // expected exception; ignored
+    }
+  }
+
+  @Ignore("Disabled until we fix Hive bug to deserialize alter_database event messages")
+  @Test
+  public void testAlterDatabaseEvents() throws TException, ImpalaException {
+    createDatabase();
+    String testDbParamKey = "testKey";
+    String testDbParamVal = "testVal";
+    eventsProcessor_.processEvents();
+    assertFalse("Newly created test database has db should not have parameter with key "
+            + testDbParamKey,
+        catalog_.getDb(TEST_DB_NAME)
+            .getMetaStoreDb()
+            .getParameters()
+            .containsKey(testDbParamKey));
+    // test change of parameters to the Database
+    addDatabaseParameters(testDbParamKey, "someDbParamVal");
+    eventsProcessor_.processEvents();
+    assertTrue("Altered database should have set the key " + testDbParamKey + " to value "
+            + testDbParamVal + " in parameters",
+        testDbParamVal.equals(catalog_.getDb(TEST_DB_NAME)
+                                  .getMetaStoreDb()
+                                  .getParameters()
+                                  .get(testDbParamKey)));
+
+    // test update to the default location
+    String currentLocation =
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getLocationUri();
+    String newLocation = currentLocation + File.separatorChar + "newTestLocation";
+    Database alteredDb = catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().deepCopy();
+    alteredDb.setLocationUri(newLocation);
+    alterDatabase(alteredDb);
+    eventsProcessor_.processEvents();
+    assertTrue("Altered database should have the updated location",
+        newLocation.equals(
+            catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getLocationUri()));
+
+    // test change of owner
+    String owner = catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName();
+    final String newOwner = "newTestOwner";
+    // sanity check
+    assertFalse(newOwner.equals(owner));
+    alteredDb = catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().deepCopy();
+    alteredDb.setOwnerName(newOwner);
+    alterDatabase(alteredDb);
+    eventsProcessor_.processEvents();
+    assertTrue("Altered database should have the updated owner",
+        newOwner.equals(catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName()));
+  }
+
+  /**
+   * Test creates two table (partitioned and non-partitioned) and makes sure that CatalogD
+   * has the two created table objects after the CREATE_TABLE events are processed.
+   */
+  @Test
+  public void testCreateTableEvent() throws TException, ImpalaException {
+    createDatabase();
+    eventsProcessor_.processEvents();
+    assertNull(TEST_TABLE_NAME_NONPARTITIONED + " is not expected to exist",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    // create a non-partitioned table
+    createTable(TEST_TABLE_NAME_NONPARTITIONED, false);
+    eventsProcessor_.processEvents();
+    assertNotNull("Catalog should have a incomplete instance of table after CREATE_TABLE "
+            + "event is received",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    assertTrue("Newly created table from events should be a IncompleteTable",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+    // test partitioned table case
+    createTable(TEST_TABLE_NAME_PARTITIONED, true);
+    eventsProcessor_.processEvents();
+    assertNotNull("Catalog should have create a incomplete table after receiving "
+            + "CREATE_TABLE event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+    assertTrue("Newly created table should be instance of IncompleteTable",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+                instanceof IncompleteTable);
+  }
+
+  /**
+   * This tests adds few partitions to a existing table and makes sure that the subsequent
+   * load table command fetches the expected number of partitions. It relies on the fact
+   * the HMSEventProcessor currently just issues a invalidate command on the table instead
+   * of directly refreshing the partition objects TODO: This test can be improved further
+   * to check if the table has new partitions without the load command once IMPALA-7973 is
+   * fixed
+   */
+  @Test
+  public void testPartitionEvents() throws TException, ImpalaException {
+    createDatabase();
+    createTable(TEST_TABLE_NAME_PARTITIONED, true);
+    // sync to latest event id
+    eventsProcessor_.processEvents();
+
+    // simulate the table being loaded by explicitly calling load table
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    List<List<String>> partVals = new ArrayList<>();
+
+    // create 4 partitions
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    partVals.add(Arrays.asList("3"));
+    partVals.add(Arrays.asList("4"));
+    addPartitions(TEST_TABLE_NAME_PARTITIONED, partVals);
+
+    eventsProcessor_.processEvents();
+    // after ADD_PARTITION event is received currently we just invalidate the table
+    assertTrue("Table should have been invalidated after add partition event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+                instanceof IncompleteTable);
+
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    assertEquals("Unexpected number of partitions fetched for the loaded table", 4,
+        ((HdfsTable) catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED))
+            .getPartitions()
+            .size());
+
+    // now remove some partitions to see if catalogD state gets invalidated
+    partVals.clear();
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    partVals.add(Arrays.asList("3"));
+    dropPartitions(TEST_TABLE_NAME_PARTITIONED, partVals);
+    eventsProcessor_.processEvents();
+
+    assertTrue("Table should have been invalidated after drop partition event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+            instanceof IncompleteTable);
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    assertEquals("Unexpected number of partitions fetched for the loaded table", 1,
+        ((HdfsTable) catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED))
+            .getPartitions().size());
+
+    // issue alter partition ops
+    partVals.clear();
+    partVals.add(Arrays.asList("4"));
+    Map<String, String> newParams = new HashMap<>(2);
+    newParams.put("alterKey1", "alterVal1");
+    alterPartitions(TEST_TABLE_NAME_PARTITIONED, partVals, newParams);
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after alter partition event",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED)
+            instanceof IncompleteTable);
+  }
+
+  /**
+   * Test generates ALTER_TABLE events for various cases (table rename, parameter change,
+   * add/remove/change column) and makes sure that the table is updated on the CatalogD
+   * side after the ALTER_TABLE event is processed.
+   */
+  @Test
+  public void testAlterTableEvent() throws TException, ImpalaException {
+    createDatabase();
+    createTable("old_name", false);
+    // sync to latest events
+    eventsProcessor_.processEvents();
+    // simulate the table being loaded by explicitly calling load table
+    loadTable("old_name");
+
+    // test renaming a table from outside aka metastore client
+    alterTableRename("old_name", TEST_TABLE_NAME_NONPARTITIONED);
+    eventsProcessor_.processEvents();
+    // table with the old name should not be present anymore
+    assertNull(
+        "Old named table still exists", catalog_.getTable(TEST_DB_NAME, "old_name"));
+    // table with the new name should be present in Incomplete state
+    Table newTable = catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED);
+    assertNotNull("Table with the new name is not found", newTable);
+    assertTrue("Table with the new name should be incomplete",
+        newTable instanceof IncompleteTable);
+
+    // check invalidate after alter table add parameter
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    alterTableAddParameter(TEST_TABLE_NAME_NONPARTITIONED, "somekey", "someval");
+    eventsProcessor_.processEvents();
+    assertTrue("Table should be incomplete after alter table add parameter",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+
+    // check invalidate after alter table add col
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    alterTableAddCol(TEST_TABLE_NAME_NONPARTITIONED, "newCol", "int", "null");
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after alter table add column",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+
+    // check invalidate after alter table change column type
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    altertableChangeCol(TEST_TABLE_NAME_NONPARTITIONED, "newCol", "string", null);
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after changing column type",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+
+    // check invalidate after alter table remove column
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    alterTableRemoveCol(TEST_TABLE_NAME_NONPARTITIONED, "newCol");
+    eventsProcessor_.processEvents();
+    assertTrue("Table should have been invalidated after removing a column",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)
+                instanceof IncompleteTable);
+  }
+
+  /**
+   * Test drops table using a metastore client and makes sure that the table does not
+   * exist in the catalogD after processing DROP_TABLE event is processed. Repeats the
+   * same test for a partitioned table.
+   */
+  @Test
+  public void testDropTableEvent() throws TException, ImpalaException {
+    createDatabase();
+    final String TBL_TO_BE_DROPPED = "tbl_to_be_dropped";
+    createTable(TBL_TO_BE_DROPPED, false);
+    eventsProcessor_.processEvents();
+    loadTable(TBL_TO_BE_DROPPED);
+    // issue drop table and make sure it doesn't exist after processing the events
+    dropTable(TBL_TO_BE_DROPPED);
+    eventsProcessor_.processEvents();
+    assertTrue("Table should not be found after processing drop_table event",
+        catalog_.getTable(TEST_DB_NAME, TBL_TO_BE_DROPPED) == null);
+
+    // test partitioned table drop
+    createTable(TBL_TO_BE_DROPPED, true);
+
+    eventsProcessor_.processEvents();
+    loadTable(TBL_TO_BE_DROPPED);
+    // create 2 partitions
+    List<List<String>> partVals = new ArrayList<>(2);
+    partVals.add(Arrays.asList("1"));
+    partVals.add(Arrays.asList("2"));
+    addPartitions(TBL_TO_BE_DROPPED, partVals);
+    dropTable(TBL_TO_BE_DROPPED);
+    eventsProcessor_.processEvents();
+    assertTrue("Partitioned table should not be found after processing drop_table event",
+        catalog_.getTable(TEST_DB_NAME, TBL_TO_BE_DROPPED) == null);
+  }
+
+  /**
+   * Test makes sure that the events are not processed when the event processor is in
+   * STOPPED state
+   * @throws TException
+   */
+  @Test
+  public void testStopEventProcessing() throws TException {
+    try {
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      eventsProcessor_.stop();
+      createDatabase();
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.STOPPED, eventsProcessor_.getStatus());
+      assertNull(
+          "Test database should not be in catalog when event processing is stopped",
+          catalog_.getDb(TEST_DB_NAME));
+    } finally {
+      eventsProcessor_.start();
+    }
+  }
+
+  /**
+   * Test makes sure that event processing is restarted after a stop/start(eventId)
+   * call sequence to event processor
+   */
+  @Test
+  public void testEventProcessorRestart() throws TException {
+    try {
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      long syncedIdBefore = eventsProcessor_.getLastSyncedEventId();
+      eventsProcessor_.stop();
+      createDatabase();
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.STOPPED, eventsProcessor_.getStatus());
+      assertNull(
+          "Test database should not be in catalog when event processing is stopped",
+          catalog_.getDb(TEST_DB_NAME));
+      eventsProcessor_.start(syncedIdBefore);
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      eventsProcessor_.processEvents();
+      assertNotNull(
+          "Test database should be in catalog when event processing is restarted",
+          catalog_.getDb(TEST_DB_NAME));
+    } finally {
+      if (eventsProcessor_.getStatus() != EventProcessorStatus.ACTIVE) {
+        eventsProcessor_.start();
+      }
+    }
+  }
+
+  /**
+   * Test makes sure that event processor is restarted after reset()
+   */
+  @Test
+  public void testEventProcessingAfterReset() throws ImpalaException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    long syncedIdBefore = eventsProcessor_.getLastSyncedEventId();
+    catalog_.reset();
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    // nothing changed so event id remains the same
+    assertEquals(syncedIdBefore, eventsProcessor_.getLastSyncedEventId());
+  }
+
+  /**
+   * Test creates, drops and creates a table with the same name from Impala. This would
+   * lead to an interesting sequence of CREATE_TABLE, DROP_TABLE, CREATE_TABLE events
+   * while the catalogD state has the latest version of the table cached. Test makes sure
+   * that Event processor does not modify catalogd state since the catalog table is
+   * already at its latest state
+   */
+  @Test
+  public void testCreateDropCreateTableFromImpala() throws ImpalaException, TException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    createDatabase();
+    eventsProcessor_.processEvents();
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    assertNotNull("Table should have been found after create table statement",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
+    // now catalogD does not have the table entry, create the table again
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    assertNotNull("Table should have been found after create table statement",
+        catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    // the first create table event should not change anything to the catalogd's
+    // created table
+    assertEquals(3, events.size());
+    Table existingTable = catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED);
+    int creationTime = existingTable.getMetaStoreTable().getCreateTime();
+    assertEquals("CREATE_TABLE", events.get(0).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
+    // after processing the create_table the original table should still remain the same
+    assertEquals(creationTime, catalog_.getTable(TEST_DB_NAME,
+        TEST_TABLE_NAME_NONPARTITIONED).getMetaStoreTable().getCreateTime());
+    //second event should be drop_table. This event should also be skipped since
+    // catalog state is more recent than the event
+    assertEquals("DROP_TABLE", events.get(1).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(1)));
+    // even after drop table event, the table should still exist
+    assertNotNull("Table should have existed since catalog state is current and event "
+        + "is stale", catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    // the final create table event should also be ignored since its a self-event
+    assertEquals("CREATE_TABLE", events.get(2).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(2)));
+    assertFalse(
+        "Table should have been loaded since the create_table should be " + "ignored",
+        catalog_.getTable(TEST_DB_NAME,
+            TEST_TABLE_NAME_NONPARTITIONED) instanceof IncompleteTable);
+    //finally make sure the table is still the same
+    assertEquals(creationTime, catalog_.getTable(TEST_DB_NAME,
+        TEST_TABLE_NAME_NONPARTITIONED).getMetaStoreTable().getCreateTime());
+  }
+
+  /**
+   * Test generates DDL events on table and makes sure that event processing does not
+   * modify the catalog state
+   *
+   * @throws ImpalaException
+   */
+  @Test
+  public void testTableEventsFromImpala() throws ImpalaException {
+    createDatabaseFromImpala(TEST_DB_NAME, "created from Impala");
+    createTableFromImpala(TEST_TABLE_NAME_PARTITIONED, true);
+    loadTable(TEST_TABLE_NAME_PARTITIONED);
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(2, events.size());
+
+    eventsProcessor_.processEvents(events);
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertNotNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+    assertFalse("Table should have been loaded since it was already latest", catalog_
+        .getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED) instanceof IncompleteTable);
+
+    dropTableFromImpala(TEST_TABLE_NAME_PARTITIONED);
+    assertNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+    events = eventsProcessor_.getNextMetastoreEvents();
+    // should have 1 drop_table event
+    assertEquals(1, events.size());
+    eventsProcessor_.processEvents(events);
+    // dropping a non-existant table should cause event processor to go into error state
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    assertNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED));
+  }
+
+  /**
+   * Creates events like create, drop with the same tblName. In such case the create
+   * table should not create a in
+   */
+  @Test
+  public void testEventFiltering() throws ImpalaException {
+    createDatabaseFromImpala(TEST_DB_NAME, "");
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    loadTable(TEST_TABLE_NAME_NONPARTITIONED);
+    assertNotNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+    dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
+    // the create table event should be filtered out
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(3, events.size());
+    List<MetastoreEvent> filteredEvents =
+        eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events);
+    assertEquals(2, filteredEvents.size());
+    assertEquals(MetastoreEventType.CREATE_DATABASE, filteredEvents.get(0).eventType_);
+    assertEquals(MetastoreEventType.DROP_TABLE, filteredEvents.get(1).eventType_);
+    eventsProcessor_.processEvents();
+    assertNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
+
+    // test the table rename case
+    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    renameTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, "new_name");
+    events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(2, events.size());
+    filteredEvents =
+        eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events);
+    assertEquals(1, filteredEvents.size());
+    assertEquals(MetastoreEventType.ALTER_TABLE, filteredEvents.get(0).eventType_);
+  }
+
+  /**
+   * Similar to create,drop,create sequence table as in
+   * <code>testCreateDropCreateTableFromImpala</code> but operates on Database instead
+   * of Table. Makes sure that the database creationTime is checked before processing
+   * create and drop database events
+   */
+  @Ignore("Ignored since database createTime is unavailable until we have HIVE-21077")
+  @Test
+  public void testCreateDropCreateDatabaseFromImpala() throws ImpalaException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    createDatabaseFromImpala(TEST_DB_NAME, "first");
+    assertNotNull("Db should have been found after create database statement",
+        catalog_.getDb(TEST_DB_NAME));
+    dropDatabaseFromImpala(TEST_DB_NAME);
+    assertNull(catalog_.getDb(TEST_DB_NAME));
+    createDatabaseFromImpala(TEST_DB_NAME, "second");
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    // should have 3 events for create,drop and create database
+    assertEquals(3, events.size());
+
+    assertEquals("CREATE_DATABASE", events.get(0).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
+    // create_database event should have no effect since catalogD has already a later
+    // version of database with the same name.
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertEquals("second",
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
+
+    // now process drop_database event
+    assertEquals("DROP_DATABASE", events.get(1).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(1)));
+    // database should not be dropped since catalogD is at the latest state
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertEquals("second",
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
+
+    // the third create_database event should have no effect too
+    assertEquals("CREATE_DATABASE", events.get(2).getEventType());
+    eventsProcessor_.processEvents(Lists.newArrayList(events.get(2)));
+    assertNotNull(catalog_.getDb(TEST_DB_NAME));
+    assertEquals("second",
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
+  }
+
+  private void createDatabase() throws TException { createDatabase(TEST_DB_NAME); }
+
+  private void createDatabase(String dbName) throws TException {
+    Database database =
+        new DatabaseBuilder()
+            .setName(dbName)
+            .setDescription("Notification test database")
+            .addParam("dbparamkey", "dbparamValue")
+            .setOwnerName("NotificationTestOwner")
+            .setOwnerType(PrincipalType.USER).build();
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().createDatabase(database);
+    }
+  }
+
+  private void addDatabaseParameters(String key, String val) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      Database msDb = msClient.getHiveClient().getDatabase(TEST_DB_NAME);
+      assertFalse(key + " already exists in the database parameters",
+          msDb.getParameters().containsKey(key));
+      msDb.putToParameters(key, val);
+      msClient.getHiveClient().alterDatabase(TEST_DB_NAME, msDb);
+    }
+  }
+
+  private void alterDatabase(Database newDatabase) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().alterDatabase(newDatabase.getName(), newDatabase);
+    }
+  }
+
+  private void createTable(String tblName, boolean isPartitioned) throws TException {
+    TableBuilder tblBuilder =
+        new TableBuilder()
+            .setTableName(tblName)
+            .setDbName(TEST_DB_NAME)
+            .addTableParam("tblParamKey", "tblParamValue")
+            .addCol("c1", "string", "c1 description")
+            .addCol("c2", "string", "c2 description")
+            .setSerdeLib(HdfsFileFormat.PARQUET.serializationLib())
+            .setInputFormat(HdfsFileFormat.PARQUET.inputFormat())
+            .setOutputFormat(HdfsFileFormat.PARQUET.outputFormat());
+    if (isPartitioned) {
+      tblBuilder.addPartCol("p1", "string", "partition p1 description");
+    }
+
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().createTable(tblBuilder.build());
+    }
+  }
+
+  /**
+   * Drops table from Impala
+   */
+  private void dropTableFromImpala(String tblName) throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.DROP_TABLE);
+    TDropTableOrViewParams dropTableParams = new TDropTableOrViewParams();
+    dropTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
+    dropTableParams.setIf_exists(true);
+    req.setDrop_table_or_view_params(dropTableParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Creates db from Impala
+   */
+  private void createDatabaseFromImpala(String dbName, String desc)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.CREATE_DATABASE);
+    TCreateDbParams createDbParams = new TCreateDbParams();
+    createDbParams.setDb(dbName);
+    createDbParams.setComment(desc);
+    req.setCreate_db_params(createDbParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Drops db from Impala
+   */
+  private void dropDatabaseFromImpala(String dbName) throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.DROP_DATABASE);
+    TDropDbParams dropDbParams = new TDropDbParams();
+    dropDbParams.setDb(dbName);
+    req.setDrop_db_params(dropDbParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Creates a table using CatalogOpExecutor to simulate a DDL operation from Impala
+   * client
+   */
+  private void createTableFromImpala(String tblName, boolean isPartitioned)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.CREATE_TABLE);
+    TCreateTableParams createTableParams = new TCreateTableParams();
+    createTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
+    createTableParams.setFile_format(THdfsFileFormat.PARQUET);
+    createTableParams.setIs_external(false);
+    createTableParams.setIf_not_exists(false);
+    Map<String, String> properties = new HashMap<>();
+    properties.put("tblParamKey", "tblParamValue");
+    createTableParams.setTable_properties(properties);
+    List<TColumn> columns = new ArrayList<>(2);
+    columns.add(getScalarColumn("c1", TPrimitiveType.STRING));
+    columns.add(getScalarColumn("c2", TPrimitiveType.STRING));
+    createTableParams.setColumns(columns);
+    // create two partition columns if specified
+    if (isPartitioned) {
+      List<TColumn> partitionColumns = new ArrayList<>(2);
+      partitionColumns.add(getScalarColumn("p1", TPrimitiveType.INT));
+      partitionColumns.add(getScalarColumn("p2", TPrimitiveType.STRING));
+      createTableParams.setPartition_columns(partitionColumns);
+    }
+    req.setCreate_table_params(createTableParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Renames a table from oldTblName to newTblName from Impala
+   */
+  private void renameTableFromImpala(String oldTblName, String newTblName)
+      throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.ALTER_TABLE);
+    TAlterTableOrViewRenameParams renameParams = new TAlterTableOrViewRenameParams();
+    renameParams.new_table_name = new TTableName(TEST_DB_NAME, newTblName);
+    TAlterTableParams alterTableParams = new TAlterTableParams();
+    alterTableParams.setAlter_type(TAlterTableType.RENAME_TABLE);
+    alterTableParams.setTable_name(new TTableName(TEST_DB_NAME, oldTblName));
+    alterTableParams.setRename_params(renameParams);
+    req.setAlter_table_params(alterTableParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  private TColumn getScalarColumn(String colName, TPrimitiveType type) {
+    TTypeNode tTypeNode = new TTypeNode(TTypeNodeType.SCALAR);
+    tTypeNode.setScalar_type(new TScalarType(type));
+    TColumnType columnType = new TColumnType(Arrays.asList(tTypeNode));
+    return new TColumn(colName, columnType);
+  }
+
+  private void dropTable(String tableName) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().dropTable(TEST_DB_NAME, tableName, true, false);
+    }
+  }
+
+  private void alterTableRename(String tblName, String newTblName) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      msTable.setTableName(newTblName);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void alterTableAddParameter(String tblName, String key, String val)
+      throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      msTable.getParameters().put(key, val);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void alterTableAddCol(
+      String tblName, String colName, String colType, String comment) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      msTable.getSd().getCols().add(new FieldSchema(colName, colType, comment));
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void altertableChangeCol(
+      String tblName, String colName, String colType, String comment) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      FieldSchema targetCol = null;
+      for (FieldSchema col : msTable.getSd().getCols()) {
+        if (col.getName().equalsIgnoreCase(colName)) {
+          targetCol = col;
+          break;
+        }
+      }
+      assertNotNull("Column " + colName + " does not exist", targetCol);
+      targetCol.setName(colName);
+      targetCol.setType(colType);
+      targetCol.setComment(comment);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  private void alterTableRemoveCol(String tblName, String colName) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      FieldSchema targetCol = null;
+      for (FieldSchema col : msTable.getSd().getCols()) {
+        if (col.getName().equalsIgnoreCase(colName)) {
+          targetCol = col;
+          break;
+        }
+      }
+      assertNotNull("Column " + colName + " does not exist", targetCol);
+      msTable.getSd().getCols().remove(targetCol);
+      msClient.getHiveClient().alter_table_with_environmentContext(
+          TEST_DB_NAME, tblName, msTable, null);
+    }
+  }
+
+  /**
+   * Removes the partition by values from HMS
+   * @param tblName
+   * @param partitionValues
+   * @throws TException
+   */
+  private void dropPartitions(String tblName, List<List<String>> partitionValues)
+      throws TException {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      for (List<String> partVals : partitionValues) {
+        metaStoreClient.getHiveClient().dropPartition(TEST_DB_NAME, tblName,
+            partVals, true);
+      }
+    }
+  }
+
+  private void alterPartitions(String tblName, List<List<String>> partValsList,
+      Map<String, String> newParams)
+      throws TException {
+    GetPartitionsRequest request = new GetPartitionsRequest();
+    request.setDbName(TEST_DB_NAME);
+    List<Partition> partitions = new ArrayList<>();
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      for (List<String> partVal : partValsList) {
+        Partition partition = metaStoreClient.getHiveClient().getPartition(TEST_DB_NAME,
+            tblName,
+            partVal);
+        partition.setParameters(newParams);
+        partitions.add(partition);
+      }
+
+      metaStoreClient.getHiveClient().alter_partitions(TEST_DB_NAME, tblName, partitions);
+    }
+  }
+
+  private void addPartitions(String tblName, List<List<String>> partitionValues)
+      throws TException {
+    int i = 0;
+    List<Partition> partitions = new ArrayList<>(partitionValues.size());
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          msClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      for (List<String> partVals : partitionValues) {
+        partitions.add(
+            new PartitionBuilder()
+                .fromTable(msTable)
+                .setInputFormat(msTable.getSd().getInputFormat())
+                .setSerdeLib(msTable.getSd().getSerdeInfo().getSerializationLib())
+                .setOutputFormat(msTable.getSd().getOutputFormat())
+                .setValues(partVals)
+                .build());
+      }
+    }
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      metaStoreClient.getHiveClient().add_partitions(partitions);
+    }
+  }
+
+  private Table loadTable(String tblName) throws CatalogException {
+    Table loadedTable = catalog_.getOrLoadTable(TEST_DB_NAME, tblName);
+    assertFalse("Table should have been loaded after getOrLoadTable call",
+        loadedTable instanceof IncompleteTable);
+    return loadedTable;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
new file mode 100644
index 0000000..2976bbe
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import org.apache.impala.catalog.CatalogServiceCatalog;
+
+/**
+ * A test MetastoreEventProcessor which executes in the same thread. Useful for testing
+ * functionality of MetastoreEventsProcessor
+ */
+public class SynchronousHMSEventProcessorForTests extends MetastoreEventsProcessor {
+  SynchronousHMSEventProcessorForTests(
+      CatalogServiceCatalog catalog, long startSyncFromId, long pollingFrequencyInSec) {
+    super(catalog, startSyncFromId, pollingFrequencyInSec);
+  }
+
+  @Override
+  public void startScheduler() {
+    // nothing to do here; there is no background thread for this processor
+  }
+}
diff --git a/fe/src/test/resources/postgresql-hive-site.xml.template b/fe/src/test/resources/postgresql-hive-site.xml.template
index 0847aef..60c047e 100644
--- a/fe/src/test/resources/postgresql-hive-site.xml.template
+++ b/fe/src/test/resources/postgresql-hive-site.xml.template
@@ -226,4 +226,15 @@
   <value>${INTERNAL_LISTEN_HOST}:2181</value>
   <description>The ZooKeeper token store connect string.</description>
 </property>
+
+<!-- This property is required to issue invalidates based on metastore events.
+See IMPALA-7954 for details -->
+<property>
+  <name>hive.metastore.notifications.add.thrift.objects</name>
+  <value>true</value>
+</property>
+<property>
+  <name>hive.metastore.alter.notifications.basic</name>
+  <value>false</value>
+</property>
 </configuration>