You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/28 17:24:38 UTC

[impala] 01/03: IMPALA-8149 : Add support for alter_database events

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3ea929af6b685f353b8cc2042f553985a4c0b4c7
Author: xiaomeng <xi...@cloudera.com>
AuthorDate: Fri Apr 5 17:53:34 2019 -0700

    IMPALA-8149 : Add support for alter_database events
    
    This change adds support for alter_database events in two parts:
    One is adding catalogServiceId and catalogVersion in db parameters when
    alter database.
    The other is adding alter database event, check if it's self event
    during process, if true do nothing, if false replace caralog cached db
    with event db.
    
    Testing:
    Enabled testAlterDisableFlagFromDb in MetastoreEventsProcessorTest.
    
    Change-Id: Iaf020e85cae04163bf32e31363eb4119d624640b
    Reviewed-on: http://gerrit.cloudera.org:8080/13049
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/Catalog.java    |   2 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  31 ++-
 fe/src/main/java/org/apache/impala/catalog/Db.java |  49 ++++-
 .../impala/catalog/events/MetastoreEvents.java     | 236 +++++++++++++--------
 .../apache/impala/service/CatalogOpExecutor.java   |  41 +++-
 .../events/MetastoreEventsProcessorTest.java       |  89 +++++++-
 6 files changed, 337 insertions(+), 111 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 3b22865..c61e20a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -113,7 +113,7 @@ public abstract class Catalog implements AutoCloseable {
    * Returns null if no matching database is found.
    */
   public Db getDb(String dbName) {
-    Preconditions.checkState(dbName != null && !dbName.isEmpty(),
+    Preconditions.checkArgument(dbName != null && !dbName.isEmpty(),
         "Null or empty database name given as argument to Catalog.getDb");
     return dbCache_.get().get(dbName.toLowerCase());
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 8bc6025..cd64b09 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -736,6 +736,9 @@ public class CatalogServiceCatalog extends Catalog {
         throw new DatabaseNotFoundException(
             String.format("Database %s not found", dbName));
       }
+      if (tblName == null) {
+        return db.getVersionsForInflightEvents();
+      }
       Table tbl = getTable(dbName, tblName);
       if (tbl == null) {
         throw new TableNotFoundException(
@@ -749,8 +752,11 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Removes a given version number from the catalog table's list of versions for
-   * in-flight events. Applicable only when external event processing is enabled.
+   * Removes a given version number from the catalog database/table's list of versions
+   * for in-flight events.
+   * If tblName is null, removes version number from database.
+   * If tblName not null and table is not incomplete, removes version number from table
+   * Applicable only when external event processing is enabled.
    * @param dbName database name
    * @param tblName table name
    */
@@ -762,6 +768,10 @@ public class CatalogServiceCatalog extends Catalog {
     try {
       Db db = getDb(dbName);
       if (db == null) return;
+      if (tblName == null) {
+        db.removeFromVersionsForInflightEvents(versionNumber);
+        return;
+      }
       Table tbl = getTable(dbName, tblName);
       if (tbl == null) {
         throw new TableNotFoundException(
@@ -793,6 +803,23 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Adds a given version number from the catalog database's list of versions for
+   * in-flight events. Applicable only when external event processing is enabled.
+   *
+   * @param db Catalog database
+   * @param versionNumber version number to be added
+   */
+  public void addVersionsForInflightEvents(Db db, long versionNumber) {
+    if (!isExternalEventProcessingEnabled()) return;
+    versionLock_.writeLock().lock();
+    try {
+      db.addToVersionsForInflightEvents(versionNumber);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Get a snapshot view of all the catalog objects that were deleted between versions
    * ('fromVersion', 'toVersion'].
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index cc1a569..e36cfef 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -21,10 +21,11 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.KuduPartitionParam;
@@ -95,6 +96,14 @@ public class Db extends CatalogObjectImpl implements FeDb {
   // (e.g. can't drop it, can't add tables to it, etc).
   private boolean isSystemDb_ = false;
 
+  // maximum number of catalog versions to store for in-flight events for this database
+  private static final int MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
+
+  // FIFO list of versions for all the in-flight metastore events in this database
+  // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
+  // is attempted to be added to this list when its at maximum capacity is ignored
+  private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
+
   public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
     setMetastoreDb(name, msDb);
     tableCache_ = new CatalogObjectCache<>();
@@ -482,4 +491,42 @@ public class Db extends CatalogObjectImpl implements FeDb {
     tDatabase.setMetastore_db(msDb);
     thriftDb_.set(tDatabase);
   }
+
+  /**
+   * Gets the current list of versions for in-flight events for this database
+   */
+  public List<Long> getVersionsForInflightEvents() {
+    return Collections.unmodifiableList(versionsForInflightEvents_);
+  }
+
+  /**
+   * Removes a given version from the collection of version numbers for in-flight events
+   * @param versionNumber version number to remove from the collection
+   * @return true if version was successfully removed, false if didn't exist
+   */
+  public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    return versionsForInflightEvents_.remove(versionNumber);
+  }
+
+  /**
+   * Adds a version number to the collection of versions for in-flight events. If the
+   * collection is already at the max size defined by
+   * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
+   * does not add it
+   * @param versionNumber version number to add
+   * @return True if version number was added, false if the collection is at its max
+   * capacity
+   */
+  public boolean addToVersionsForInflightEvents(long versionNumber) {
+    if (versionsForInflightEvents_.size() >= MAX_NUMBER_OF_INFLIGHT_EVENTS) {
+      LOG.warn(String.format("Number of versions to be stored for database %s is at "
+              + " its max capacity %d. Ignoring add request for version number %d. This "
+              + "could cause unnecessary database invalidation when the event is "
+              + "processed",
+          getName(), MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+      return false;
+    }
+    versionsForInflightEvents_.add(versionNumber);
+    return true;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 8b5b6f3..8deff92 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog.events;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
@@ -49,6 +51,7 @@ import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.ClassUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
@@ -142,9 +145,7 @@ public class MetastoreEvents {
         case ALTER_TABLE: return new AlterTableEvent(catalog_, metrics_, event);
         case CREATE_DATABASE: return new CreateDatabaseEvent(catalog_, metrics_, event);
         case DROP_DATABASE: return new DropDatabaseEvent(catalog_, metrics_, event);
-        case ALTER_DATABASE:
-          // alter database events are currently ignored
-          return new IgnoredEvent(catalog_, metrics_, event);
+        case ALTER_DATABASE: return new AlterDatabaseEvent(catalog_, metrics_, event);
         case ADD_PARTITION:
           // add partition events triggers invalidate table currently
           return new AddPartitionEvent(catalog_, metrics_, event);
@@ -237,6 +238,9 @@ public class MetastoreEvents {
     // dbName from the event
     protected final String dbName_;
 
+    // tblName from the event
+    protected final String tblName_;
+
     // eventId of the event. Used instead of calling getter on event_ everytime
     protected final long eventId_;
 
@@ -249,6 +253,16 @@ public class MetastoreEvents {
     // metrics registry so that events can add metrics
     protected final Metrics metrics_;
 
+    // version number from the event object parameters used for self-event detection
+    protected long versionNumberFromEvent_ = -1;
+    // service id from the event object parameters used for self-event detection
+    protected String serviceIdFromEvent_ = null;
+    // the list of versions which this catalog expects to be see in the
+    // self-generated events in order. Anything which is seen out of order of this list
+    // will be used to determine that this is not a self-event and table will be
+    // invalidated. See <code>isSelfEvent</code> for more details.
+    protected List<Long> pendingVersionNumbersFromCatalog_ = Collections.EMPTY_LIST;
+
     MetastoreEvent(CatalogServiceCatalog catalogServiceCatalog, Metrics metrics,
         NotificationEvent event) {
       this.catalog_ = catalogServiceCatalog;
@@ -256,6 +270,7 @@ public class MetastoreEvents {
       this.eventId_ = event_.getEventId();
       this.eventType_ = MetastoreEventType.from(event.getEventType());
       this.dbName_ = Preconditions.checkNotNull(event.getDbName());
+      this.tblName_ = event.getTableName();
       this.metastoreNotificationEvent_ = event;
       this.metrics_ = metrics;
     }
@@ -359,15 +374,85 @@ public class MetastoreEvents {
      * with this event
      */
     protected abstract boolean isEventProcessingDisabled();
+
+    /**
+     * This method detects if this event is self-generated or not (see class
+     * documentation of <code>MetastoreEventProcessor</code> to understand what a
+     * self-event is).
+     *
+     * In order to determine this, it compares the value of catalogVersion from the
+     * event with the list of pending version numbers stored in the catalog
+     * database/table. The event could be generated by another instance of CatalogService
+     * which can potentially have the same versionNumber. In order to resolve such
+     * conflict, it compares the CatalogService's serviceId before comparing the version
+     * number. If it is determined that this is indeed a self-event, this method also
+     * clears the version number from the catalog database/table's list of pending
+     * versions for in-flight events. This is needed so that a subsequent event with the
+     * same service id or version number is not incorrectly determined as a self-event. A
+     * subsequent event with the same serviceId and versionNumber is most likely generated
+     * by a non-Impala system because it cached the table object having those values of
+     * serviceId and version. More details on complete flow of self-event handling
+     * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
+     *
+     * @return True if this event is a self-generated event. If the returned value is
+     * true, this method also clears the version number from the catalog database/table.
+     * Returns false if the version numbers or service id don't match
+     * @throws CatalogException in case of exceptions while removing the version number
+     * from the database/table or when reading the values of version list from catalog
+     * database/table
+     */
+    protected boolean isSelfEvent() throws CatalogException {
+      initSelfEventIdentifiersFromEvent();
+      if (versionNumberFromEvent_ == -1 || pendingVersionNumbersFromCatalog_.isEmpty()) {
+        return false;
+      }
+
+      // first check if service id is a match, then check if the event version is what we
+      // expect in the list
+      if (catalog_.getCatalogServiceId().equals(serviceIdFromEvent_)
+          && pendingVersionNumbersFromCatalog_.get(0).equals(versionNumberFromEvent_)) {
+        // we see this version for the first time. This is a self-event
+        // remove this version number from the catalog so that next time we see this
+        // version we don't determine it wrongly as self-event
+        // TODO we should improve by atomically doing this check and invalidating the
+        // table to avoid any races. Currently, it is possible the some other thread
+        // in CatalogService changes the pendingVersionNumbersFromCatalog after do
+        // the check here. However, there are only two possible operations that can
+        // happen from outside with respect to this version list. Either some thread
+        // adds a new version to the list after we looked at it, or the table is
+        // invalidated. In both the cases, it is OK since the new version added is
+        // guaranteed to be greater than versionNumberFromEvent and if the table is
+        // invalidated, this operation (this whole event) becomes a no-op
+        catalog_.removeFromInFlightVersionsForEvents(
+            dbName_, tblName_, versionNumberFromEvent_);
+        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * This method should be implemented by subclasses to initialize the values of
+     * self-event identifiers by parsing the event data. These identifiers are later
+     * used to determine if this event needs to be processed or not. See
+     * <code>isSelfEvent</code> method for details.
+     */
+    protected void initSelfEventIdentifiersFromEvent() {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported", ClassUtil.getMethodName()));
+    }
+
+    protected static String getStringProperty(
+        Map<String, String> params, String key, String defaultVal) {
+      if (params == null) return defaultVal;
+      return params.getOrDefault(key, defaultVal);
+    }
   }
 
   /**
    * Base class for all the table events
    */
   public static abstract class MetastoreTableEvent extends MetastoreEvent {
-    // tblName from the event
-    protected final String tblName_;
-
     // tbl object from the Notification event, corresponds to the before tableObj in
     // case of alter events
     protected org.apache.hadoop.hive.metastore.api.Table msTbl_;
@@ -375,7 +460,7 @@ public class MetastoreEvents {
     private MetastoreTableEvent(CatalogServiceCatalog catalogServiceCatalog,
         Metrics metrics, NotificationEvent event) {
       super(catalogServiceCatalog, metrics, event);
-      tblName_ = Preconditions.checkNotNull(event.getTableName());
+      Preconditions.checkNotNull(tblName_);
       debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
           getFullyQualifiedTblName());
     }
@@ -872,6 +957,64 @@ public class MetastoreEvents {
   }
 
   /**
+   * MetastoreEvent for ALTER_DATABASE event type
+   */
+  public static class AlterDatabaseEvent extends MetastoreDatabaseEvent {
+    // metastore database object as parsed from NotificationEvent message
+    private final Database alteredDatabase_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    private AlterDatabaseEvent(CatalogServiceCatalog catalog, Metrics metrics,
+        NotificationEvent event) throws MetastoreNotificationException {
+      super(catalog, metrics, event);
+      Preconditions.checkArgument(MetastoreEventType.ALTER_DATABASE.equals(eventType_));
+      JSONAlterDatabaseMessage alterDatabaseMessage =
+          (JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer()
+              .getAlterDatabaseMessage(event.getMessage());
+      try {
+        alteredDatabase_ =
+            Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(
+            debugString("Unable to parse the alter database message"), e);
+      }
+    }
+
+    /**
+     * Processes the alter database event by replacing the catalog cached Db object with
+     * the Db object from the event
+     */
+    @Override
+    public void process() throws CatalogException {
+      if (isSelfEvent()) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
+      // If not self event, copy Db object from event to catalog
+      catalog_.updateDb(alteredDatabase_);
+    }
+
+    @Override
+    protected void initSelfEventIdentifiersFromEvent() {
+      versionNumberFromEvent_ = Long.parseLong(getStringProperty(
+          alteredDatabase_.getParameters(), CATALOG_VERSION_PROP_KEY, "-1"));
+      serviceIdFromEvent_ = getStringProperty(
+          alteredDatabase_.getParameters(), CATALOG_SERVICE_ID_PROP_KEY, "");
+      try {
+        pendingVersionNumbersFromCatalog_ =
+            catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
+      } catch (DatabaseNotFoundException | TableNotFoundException e) {
+        // ok to ignore this exception, since if the db doesn't exit, this event needs
+        // to be ignored anyways
+        debugLog("Received exception {}. Ignoring self-event evaluation", e.getMessage());
+      }
+    }
+  }
+
+  /**
    * MetastoreEvent for the DROP_DATABASE event
    */
   public static class DropDatabaseEvent extends MetastoreDatabaseEvent {
@@ -943,16 +1086,6 @@ public class MetastoreEvents {
    * MetastoreEvent for which issues invalidate on a table from the event
    */
   public static abstract class TableInvalidatingEvent extends MetastoreTableEvent {
-    // version number from the event object parameters used for self-event detection
-    protected long versionNumberFromEvent_ = -1;
-    // service id from the event object parameters used for self-event detection
-    protected String serviceIdFromEvent_ = null;
-    // the list of versions which this catalog expects to be see in the
-    // self-generated events in order. Anything which is seen out of order of this list
-    // will be used to determine that this is not a self-event and table will be
-    // invalidated. See <code>isSelfEvent</code> for more details.
-    protected List<Long> pendingVersionNumbersFromCatalog_ = Collections.EMPTY_LIST;
-
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
      */
@@ -962,69 +1095,6 @@ public class MetastoreEvents {
     }
 
     /**
-     * This method detects if this event is self-generated or not (see class
-     * documentation of <code>MetastoreEventProcessor</code> to understand what a
-     * self-event is).
-     *
-     * In order to determine this, it compares the value of catalogVersion from the
-     * event with the list of pending version numbers stored in the catalog table. The
-     * event could be generated by another instance of CatalogService which can
-     * potentially have the same versionNumber. In order to resolve such conflict, it
-     * compares the CatalogService's serviceId before comparing the version number. If
-     * it is determined that this is indeed a self-event, this method also clears the
-     * version number from the catalog table's list of pending versions for in-flight
-     * events. This is needed so that a subsequent event with the same service id or
-     * version number is not incorrectly determined as a self-event. A subsequent
-     * event with the same serviceId and versionNumber is most likely generated by a
-     * non-Impala system because it cached the table object having those values of
-     * serviceId and version. More details on complete flow of self-event handling
-     * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
-     *
-     * @return True if this event is a self-generated event. If the returned value is
-     * true, this method also clears the version number from the catalog table. Returns
-     * false if the version numbers or service id don't match
-     * @throws CatalogException in case of exceptions while removing the version number
-     * from the table or when reading the values of version list from catalog table
-     */
-    protected boolean isSelfEvent() throws CatalogException {
-      initSelfEventIdentifiersFromEvent();
-      if (versionNumberFromEvent_ == -1 || pendingVersionNumbersFromCatalog_.isEmpty())
-        return false;
-
-      if (catalog_.getCatalogServiceId().equals(serviceIdFromEvent_)) {
-        // service id is a match. Now check if the event version is what we expect
-        // in the list
-        if (pendingVersionNumbersFromCatalog_.get(0).equals(versionNumberFromEvent_)) {
-          // we see this version for the first time. This is a self-event
-          // remove this version number from the catalog so that next time we see this
-          // version we don't determine it wrongly as self-event
-          //TODO we should improve by atomically doing this check and invalidating the
-          // table to avoid any races. Currently, it is possible the some other thread
-          // in CatalogService changes the pendingVersionNumbersFromCatalog after do
-          // the check here. However, there are only two possible operations that can
-          // happen from outside with respect to this version list. Either some thread
-          // adds a new version to the list after we looked at it, or the table is
-          // invalidated. In both the cases, it is OK since the new version added is
-          // guaranteed to be greater than versionNumberFromEvent and if the table is
-          // invalidated, this operation (this whole event) becomes a no-op
-          catalog_.removeFromInFlightVersionsForEvents(
-              dbName_, tblName_, versionNumberFromEvent_);
-          metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /**
-     * This method should be implemented by subclasses to initialize the values of
-     * self-event identifiers by parsing the event data. These identifiers are later
-     * used to determine if this event needs to be processed or not. See
-     * <code>isSelfEvent</code> method for details.
-     */
-    protected abstract void initSelfEventIdentifiersFromEvent();
-
-    /**
      * Issues a invalidate table on the catalog on the table from the event. This
      * invalidate does not fetch information from metastore unlike the invalidate metadata
      * command since event is triggered post-metastore activity. This handler invalidates
@@ -1046,12 +1116,6 @@ public class MetastoreEvents {
             + "it does not exist anymore", getFullyQualifiedTblName());
       }
     }
-
-    protected static String getStringProperty(
-        Map<String, String> params, String key, String defaultVal) {
-      if (params == null) return defaultVal;
-      return params.getOrDefault(key, defaultVal);
-    }
   }
 
   public static class AddPartitionEvent extends TableInvalidatingEvent {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b1ffc20..4adca56 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -291,6 +291,9 @@ public class CatalogOpExecutor {
 
     boolean syncDdl = ddlRequest.isSync_ddl();
     switch (ddlRequest.ddl_type) {
+      case ALTER_DATABASE:
+        alterDatabase(ddlRequest.getAlter_db_params(), response);
+        break;
       case ALTER_TABLE:
         alterTable(ddlRequest.getAlter_table_params(), response);
         break;
@@ -366,9 +369,6 @@ public class CatalogOpExecutor {
       case COMMENT_ON:
         alterCommentOn(ddlRequest.getComment_on_params(), response);
         break;
-      case ALTER_DATABASE:
-        alterDatabase(ddlRequest.getAlter_db_params(), response);
-        break;
       case COPY_TESTCASE:
         copyTestCaseData(ddlRequest.getCopy_test_case_params(), response);
         break;
@@ -701,7 +701,7 @@ public class CatalogOpExecutor {
             reloadTableSchema, null);
         addTableToCatalogUpdate(tbl, response.result);
       }
-      // now that HMS alter operation has suceeded, add this version to list of inflight
+      // now that HMS alter operation has succeeded, add this version to list of inflight
       // events in catalog table if event processing is enabled
       catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
     } finally {
@@ -3799,9 +3799,15 @@ public class CatalogOpExecutor {
 
   private void alterDatabase(TAlterDbParams params, TDdlExecResponse response)
       throws ImpalaException {
+    Preconditions.checkNotNull(params);
+    String dbName = params.getDb();
+    Db db = catalog_.getDb(dbName);
+    if (db == null) {
+      throw new CatalogException("Database: " + dbName + " does not exist.");
+    }
     switch (params.getAlter_type()) {
       case SET_OWNER:
-        alterDatabaseSetOwner(params.getDb(), params.getSet_owner_params(), response);
+        alterDatabaseSetOwner(db, params.getSet_owner_params(), response);
         break;
       default:
         throw new UnsupportedOperationException(
@@ -3809,15 +3815,14 @@ public class CatalogOpExecutor {
     }
   }
 
-  private void alterDatabaseSetOwner(String dbName, TAlterDbSetOwnerParams params,
+  private void alterDatabaseSetOwner(Db db, TAlterDbSetOwnerParams params,
       TDdlExecResponse response) throws ImpalaException {
-    Db db = catalog_.getDb(dbName);
-    if (db == null) {
-      throw new CatalogException("Database: " + dbName + " does not exist.");
-    }
     Preconditions.checkNotNull(params.owner_name);
     Preconditions.checkNotNull(params.owner_type);
     synchronized (metastoreDdlLock_) {
+      // Get a new catalog version to assign to the database being altered.
+      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
       Database msDb = db.getMetaStoreDb().deepCopy();
       String originalOwnerName = msDb.getOwnerName();
       PrincipalType originalOwnerType = msDb.getOwnerType();
@@ -3835,10 +3840,26 @@ public class CatalogOpExecutor {
       }
       Db updatedDb = catalog_.updateDb(msDb);
       addDbToCatalogUpdate(updatedDb, response.result);
+      // now that HMS alter operation has succeeded, add this version to list of inflight
+      // events in catalog database if event processing is enabled
+      catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
     }
     addSummary(response, "Updated database.");
   }
 
+  /**
+   * Adds the catalog service id and the given catalog version to the database
+   * parameters. No-op if event processing is disabled
+   */
+  private void addCatalogServiceIdentifiers(
+      Db db, String catalogServiceId, long newCatalogVersion) {
+    if (!catalog_.isExternalEventProcessingEnabled()) return;
+    org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
+    msDb.putToParameters(MetastoreEvents.CATALOG_SERVICE_ID_PROP_KEY, catalogServiceId);
+    msDb.putToParameters(
+        MetastoreEvents.CATALOG_VERSION_PROP_KEY, String.valueOf(newCatalogVersion));
+  }
+
   private void addDbToCatalogUpdate(Db db, TCatalogUpdateResult result) {
     Preconditions.checkNotNull(db);
     TCatalogObject updatedCatalogObject = db.toTCatalogObject();
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 20cb8dc..36db63c 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -30,16 +30,17 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Preconditions;
+
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -72,6 +73,9 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.TAlterDbParams;
+import org.apache.impala.thrift.TAlterDbSetOwnerParams;
+import org.apache.impala.thrift.TAlterDbType;
 import org.apache.impala.thrift.TAlterTableAddColsParams;
 import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableDropColParams;
@@ -115,7 +119,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+
 import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -322,7 +328,6 @@ public class MetastoreEventsProcessorTest {
     dropDatabaseCascadeFromImpala(TEST_DB_NAME);
   }
 
-  @Ignore("Disabled until we fix Hive bug to deserialize alter_database event messages")
   @Test
   public void testAlterDatabaseEvents() throws TException, ImpalaException {
     createDatabase(TEST_DB_NAME, null);
@@ -336,14 +341,13 @@ public class MetastoreEventsProcessorTest {
             .getParameters()
             .containsKey(testDbParamKey));
     // test change of parameters to the Database
-    addDatabaseParameters(testDbParamKey, "someDbParamVal");
+    addDatabaseParameters(testDbParamKey, testDbParamVal);
     eventsProcessor_.processEvents();
+    String getParamValFromDb =
+        catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getParameters().get(testDbParamKey);
     assertTrue("Altered database should have set the key " + testDbParamKey + " to value "
-            + testDbParamVal + " in parameters",
-        testDbParamVal.equals(catalog_.getDb(TEST_DB_NAME)
-                                  .getMetaStoreDb()
-                                  .getParameters()
-                                  .get(testDbParamKey)));
+            + testDbParamVal + " in parameters, instead we get " + getParamValFromDb,
+        testDbParamVal.equals(getParamValFromDb));
 
     // test update to the default location
     String currentLocation =
@@ -370,6 +374,43 @@ public class MetastoreEventsProcessorTest {
         newOwner.equals(catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName()));
   }
 
+  /*
+   * Test to verify alter Db from Impala works fine and self events are caught
+   * successfully
+   */
+  @Test
+  public void testAlterDatabaseSetOwnerFromImpala() throws ImpalaException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    createDatabaseFromImpala(TEST_DB_NAME, null);
+    assertNotNull("Db should have been found after create database statement",
+        catalog_.getDb(TEST_DB_NAME));
+    long numberOfSelfEventsBefore =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    String owner = catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName();
+    String newOwnerUser = "newUserFromImpala";
+    String newOwnerRole = "newRoleFromImpala";
+    assertFalse(newOwnerUser.equals(owner) || newOwnerRole.equals(owner));
+    alterDbSetOwnerFromImpala(TEST_DB_NAME, newOwnerUser, TOwnerType.USER);
+    eventsProcessor_.processEvents();
+    assertEquals(
+        newOwnerUser, catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName());
+
+    alterDbSetOwnerFromImpala(TEST_DB_NAME, newOwnerRole, TOwnerType.ROLE);
+    eventsProcessor_.processEvents();
+    assertEquals(
+        newOwnerRole, catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getOwnerName());
+
+    long selfEventsCountAfter =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    // 2 alter commands above, so we expect the count to go up by 2
+    assertEquals("Unexpected number of self-events generated",
+        numberOfSelfEventsBefore + 2, selfEventsCountAfter);
+  }
+
   /**
    * Test creates two table (partitioned and non-partitioned) and makes sure that CatalogD
    * has the two created table objects after the CREATE_TABLE events are processed.
@@ -1468,7 +1509,6 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
-  @Ignore("Ignored we add alter database event support. See IMPALA-8149")
   @Test
   public void testAlterDisableFlagFromDb()
       throws TException, CatalogException, MetastoreNotificationFetchException {
@@ -1480,10 +1520,19 @@ public class MetastoreEventsProcessorTest {
     alterDatabase(alteredDb);
 
     createTable(testTblName, false);
-    assertEquals(1, eventsProcessor_.getNextMetastoreEvents().size());
+    assertEquals(2, eventsProcessor_.getNextMetastoreEvents().size());
+    long numSkippedEvents =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .getCount();
     eventsProcessor_.processEvents();
+    assertEquals(numSkippedEvents + 1,
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .getCount());
     assertNull("Table creation should be skipped when database level event sync flag is"
-        + " disabled", catalog_.getTable(TEST_DB_NAME, testTblName));
+            + " disabled",
+        catalog_.getTable(TEST_DB_NAME, testTblName));
   }
 
   private void confirmTableIsLoaded(String dbName, String tblname)
@@ -1832,6 +1881,24 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
+   * Sets the owner for the given Db from Impala
+   */
+  private void alterDbSetOwnerFromImpala(
+      String dbName, String owner, TOwnerType ownerType) throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.ALTER_DATABASE);
+    TAlterDbParams alterDbParams = new TAlterDbParams();
+    alterDbParams.setDb(dbName);
+    alterDbParams.setAlter_type(TAlterDbType.SET_OWNER);
+    TAlterDbSetOwnerParams alterDbSetOwnerParams = new TAlterDbSetOwnerParams();
+    alterDbSetOwnerParams.setOwner_name(owner);
+    alterDbSetOwnerParams.setOwner_type(ownerType);
+    alterDbParams.setSet_owner_params(alterDbSetOwnerParams);
+    req.setAlter_db_params(alterDbParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
    * Drops db from Impala
    */
   private void dropDatabaseCascadeFromImpala(String dbName) throws ImpalaException {